diff --git a/dags/taskflow.py b/dags/taskflow.py index 810dd07012c951bbedd8cd412e8da811b37a49b8..5d9ee1569262ebe6c5011be28962dfee85fcb653 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -58,7 +58,7 @@ def taskflow_example(): target = params.get('target', '/tmp/') print(f"Using {connection_id} connection") - ssh_hook = get_connection(conn_id=conn_id) + ssh_hook = get_connection(conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() @@ -79,8 +79,9 @@ def taskflow_example(): files = transform(flist=data) ucid = load(connection_id=a_id, files=files) + #b_id = ucid.output['return_value'] en = PythonOperator(python_callable=remove, op_kwargs={ - 'conn_id': a_id}, task_id='cleanup') + 'conn_id': ucid}, task_id='cleanup') conn_id >> data >> files >> ucid >> en