diff --git a/dags/taskflow.py b/dags/taskflow.py index 15c4efc5375e475ca7bb83d4ffdfc8aec06269eb..e45ed47a3e2b830c426f1b27e02b3b2e9ce2c15b 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -3,6 +3,8 @@ from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.models.connection import Connection from airflow.models.dagrun import DagRun +from airflow.providers.ssh.hooks.ssh import SSHHook + import requests import urllib.request import tempfile @@ -47,10 +49,17 @@ def taskflow_example(**kwargs): @task() def load(files: dict): print(f"Total files downloaded: {len(files)}") + ssh_hook = SSHHook(ssh_conn_id='default_ssh') + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + for [local, remote] in files.items(): + sftp_client.put(local, f"/tmp/{remote}") + + - data = extract() files = transform(data) load(files) dag = taskflow_example() + diff --git a/requirements.txt b/requirements.txt index 05b86e9419e234ac71546a0db1edaf198c2af835..1f88981a3dcd892637e066ee46b51f044b32017f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ requests urllib3==1.26.6 +apache-airflow-providers-ssh \ No newline at end of file