From 7dab1ff690dfc1e40510a0be5b8d26cf473def9a Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Mon, 14 Mar 2022 12:22:17 +0100 Subject: [PATCH] connection id --- dags/taskflow.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dags/taskflow.py b/dags/taskflow.py index 810dd07..5d9ee15 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -58,7 +58,7 @@ def taskflow_example(): target = params.get('target', '/tmp/') print(f"Using {connection_id} connection") - ssh_hook = get_connection(conn_id=conn_id) + ssh_hook = get_connection(conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() @@ -79,8 +79,9 @@ def taskflow_example(): files = transform(flist=data) ucid = load(connection_id=a_id, files=files) + #b_id = ucid.output['return_value'] en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') + 'conn_id': ucid}, task_id='cleanup') conn_id >> data >> files >> ucid >> en -- GitLab