diff --git a/dags/taskflow.py b/dags/taskflow.py index d0bd23182b7f3f90f02f69b4c156d700742e5547..d926e9ca75fd294cdef89abab977705e761b58d8 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -50,12 +50,13 @@ def taskflow_example(): print(f"Total files downloaded: {len(files)}") params = kwargs['params'] target = params.get('target', '/tmp/') + connection_id = params.get('connection', 'default_ssh') - ssh_hook = SSHHook(ssh_conn_id='default_ssh') + ssh_hook = SSHHook(ssh_conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() for [truename, local] in files.items(): - print(f"Copying {local} --> {os.path.join(target, truename)}") + print(f"Copying {local} --> {connection_id}:{os.path.join(target, truename)}") sftp_client.put(local, os.path.join(target, truename)) data = extract()