diff --git a/dags/taskflow.py b/dags/taskflow.py index 991c97f86310ebf1350f0cb256e594fd30d17aae..2b9cab2eea08c685c7a37c2a30c0e0ad04dc4b3c 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -3,6 +3,7 @@ from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago +import os from b2shareoperator import (download_file, get_file_list, get_object_md, get_objects) @@ -45,13 +46,20 @@ def taskflow_example(): return name_mappings @task() - def load(files: dict): + def load(files: dict, **kwargs): print(f"Total files downloaded: {len(files)}") + params = kwargs['params'] + if 'target' not in params: + target = '/tmp/' + print(f"Using default target {target}") + else: + target = params['target'] + ssh_hook = SSHHook(ssh_conn_id='default_ssh') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() for [truename, local] in files.items(): - sftp_client.put(local, f"/tmp/{truename}") + sftp_client.put(local, os.path.join(target, truename)) data = extract() files = transform(data) diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index eee098e53fe651f4558b91d75199bb751a955c94..8b89907d5e5dde7e763fbfd54764c01086198d1e 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -44,7 +44,7 @@ version: '2.4' x-airflow-common: &airflow-common - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2} + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor