Skip to content
Snippets Groups Projects
conn_deco.py 1.30 KiB
from datetime import timedelta

from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

from decors import get_connection, remove, setup

def_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}


@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def conn_decorator():

    @task()
    def doing_nothing(conn_id, **kwargs):
        print(f"Using connection {conn_id}")

        ssh_hook = get_connection(conn_id=conn_id, **kwargs)
        with ssh_hook.get_conn() as ssh_client:
            sftp_client = ssh_client.open_sftp()
            print("Connected")
            lst = sftp_client.listdir(path='/tmp/')
            for f in lst:
                print(f)

        return conn_id

    conn_id = PythonOperator(python_callable=setup, task_id='setup_connection')
    # another way of mixing taskflow and classical api:
    a_id = conn_id.output['return_value']
    dno = doing_nothing(conn_id=a_id)
    en = PythonOperator(python_callable=remove, op_kwargs={
                        'conn_id': dno}, task_id='cleanup')

    conn_id >> dno >> en


dag = conn_decorator()