From abbf29b9024e9a8b7312b9e5a0c59ed6a0a18e0c Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Mon, 4 Oct 2021 14:13:26 +0200 Subject: [PATCH] output as DAG parameter and new version of airflow to properly work with ssh keys --- dags/taskflow.py | 12 ++++++++++-- dockers/docker-compose.yaml | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/dags/taskflow.py b/dags/taskflow.py index 991c97f..2b9cab2 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -3,6 +3,7 @@ from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago +import os from b2shareoperator import (download_file, get_file_list, get_object_md, get_objects) @@ -45,13 +46,20 @@ def taskflow_example(): return name_mappings @task() - def load(files: dict): + def load(files: dict, **kwargs): print(f"Total files downloaded: {len(files)}") + params = kwargs['params'] + if 'target' not in params: + target = '/tmp/' + print(f"Using default target {target}") + else: + target = params['target'] + ssh_hook = SSHHook(ssh_conn_id='default_ssh') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() for [truename, local] in files.items(): - sftp_client.put(local, f"/tmp/{truename}") + sftp_client.put(local, os.path.join(target, truename)) data = extract() files = transform(data) diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index eee098e..8b89907 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -44,7 +44,7 @@ version: '2.4' x-airflow-common: &airflow-common - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2} + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor -- GitLab