From 810c16d6de6255ec1d4000aa89e343fcba7165c2 Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Tue, 31 Aug 2021 14:08:23 +0200 Subject: [PATCH] wip: ssh to the target --- dags/taskflow.py | 11 ++++++++++- requirements.txt | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/dags/taskflow.py b/dags/taskflow.py index 15c4efc..e45ed47 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -3,6 +3,8 @@ from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.models.connection import Connection from airflow.models.dagrun import DagRun +from airflow.providers.ssh.hooks.ssh import SSHHook + import requests import urllib.request import tempfile @@ -47,10 +49,17 @@ def taskflow_example(**kwargs): @task() def load(files: dict): print(f"Total files downloaded: {len(files)}") + ssh_hook = SSHHook(ssh_conn_id='default_ssh') + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + for [local, remote] in files.items(): + sftp_client.put(local, f"/tmp/{remote}") + + - data = extract() files = transform(data) load(files) dag = taskflow_example() + diff --git a/requirements.txt b/requirements.txt index 05b86e9..1f88981 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ requests urllib3==1.26.6 +apache-airflow-providers-ssh \ No newline at end of file -- GitLab