diff --git a/dags/conn_deco.py b/dags/conn_deco.py index b08841550dd16be35a43d44db4b030e8f638ce29..147bc31b47cb0a1f4e17c0db5e3e1c86e17c5386 100644 --- a/dags/conn_deco.py +++ b/dags/conn_deco.py @@ -2,6 +2,7 @@ from datetime import timedelta from airflow import settings from airflow.decorators import dag, task +from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.models.connection import Connection from airflow.utils.dates import days_ago @@ -17,15 +18,16 @@ def_args = { @dag(default_args=def_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def conn_decorator(): - @task(multiple_outputs=True) + @task def setup(**kwargs): print(f"Setting up the connection") - session = settings.Session() + params = kwargs['params'] rrid = kwargs['run_id'] - oid = params.get('oid', '12121') - key = params.get('key', "1JaqKIN1Jq+\\nf/HSoBpCCqmDPTQdT9Xq0AAAIIJKwpKCSsKSgAAAAH") - user = params.get('user', 'eflows') + host = params.get('host') + port = params.get('port', 2222) + key = params.get('key') + user = params.get('login', 'eflows') conn_id = f"tmp_connection_{rrid}" extra = {"private_key": key} @@ -33,19 +35,29 @@ def conn_decorator(): conn_id=conn_id, conn_type='ssh', description='Automatically generated Connection', - host='openssh-server', + host=host, login=user, - port=2222, + port=port, extra=extra ) + session = settings.Session() session.add(conn) session.commit() - return {'conn_id': conn_id, 'oid': oid} + print(f"Connection {conn_id} created") + return conn_id @task() - def doing_nothing(oid, conn_id): - print(f"Just doing nothing with {oid} and {conn_id}") + def doing_nothing(conn_id, **kwargs): + print(f"Just doing nothing with {conn_id}") + params = kwargs['params'] + print(f"This task recieved following kwargs: {params}") + + ssh_hook = SSHHook(ssh_conn_id=conn_id) + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + print("Connected") + return conn_id @task() @@ -55,11 +67,11 @@ def conn_decorator(): for con in session.query(Connection).all(): print(con) - session.remove(Connection.conn_id == conn_id) + session.query(Connection).filter(Connection.conn_id == conn_id).delete() session.commit() - res = setup() - conn_id = doing_nothing(res['oid'], res['conn_id']) + conn_id = setup() + conn_id = doing_nothing(conn_id=conn_id) remove(conn_id)