-
Jedrzej Rybicki authoredJedrzej Rybicki authored
conn_deco.py 1.30 KiB
from datetime import timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from decors import get_connection, remove, setup
def_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
@dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def conn_decorator():
@task()
def doing_nothing(conn_id, **kwargs):
print(f"Using connection {conn_id}")
ssh_hook = get_connection(conn_id=conn_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
print("Connected")
lst = sftp_client.listdir(path='/tmp/')
for f in lst:
print(f)
return conn_id
conn_id = PythonOperator(python_callable=setup, task_id='setup_connection')
# another way of mixing taskflow and classical api:
a_id = conn_id.output['return_value']
dno = doing_nothing(conn_id=a_id)
en = PythonOperator(python_callable=remove, op_kwargs={
'conn_id': dno}, task_id='cleanup')
conn_id >> dno >> en
dag = conn_decorator()