from datetime import timedelta from airflow import settings from airflow.decorators import dag, task from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.models.connection import Connection from airflow.utils.dates import days_ago def_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } @dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def conn_decorator(): @task def setup(**kwargs): print(f"Setting up the connection") params = kwargs['params'] rrid = kwargs['run_id'] host = params.get('host') port = params.get('port', 2222) key = params.get('key') user = params.get('login', 'eflows') conn_id = f"tmp_connection_{rrid}" extra = {"private_key": key} conn = Connection( conn_id=conn_id, conn_type='ssh', description='Automatically generated Connection', host=host, login=user, port=port, extra=extra ) session = settings.Session() session.add(conn) session.commit() print(f"Connection {conn_id} created") return conn_id @task() def doing_nothing(conn_id, **kwargs): print(f"Just doing nothing with {conn_id}") params = kwargs['params'] print(f"This task recieved following kwargs: {params}") ssh_hook = SSHHook(ssh_conn_id=conn_id) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() print("Connected") return conn_id @task() def remove(conn_id): print(f"Removing conneciton {conn_id}") session = settings.Session() for con in session.query(Connection).all(): print(con) session.query(Connection).filter(Connection.conn_id == conn_id).delete() session.commit() conn_id = setup() conn_id = doing_nothing(conn_id=conn_id) remove(conn_id) dag = conn_decorator()