diff --git a/dags/taskflow.py b/dags/taskflow.py index 991c97f86310ebf1350f0cb256e594fd30d17aae..d0bd23182b7f3f90f02f69b4c156d700742e5547 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -3,6 +3,7 @@ from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago +import os from b2shareoperator import (download_file, get_file_list, get_object_md, get_objects) @@ -45,13 +46,17 @@ def taskflow_example(): return name_mappings @task() - def load(files: dict): + def load(files: dict, **kwargs): print(f"Total files downloaded: {len(files)}") + params = kwargs['params'] + target = params.get('target', '/tmp/') + ssh_hook = SSHHook(ssh_conn_id='default_ssh') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() for [truename, local] in files.items(): - sftp_client.put(local, f"/tmp/{truename}") + print(f"Copying {local} --> {os.path.join(target, truename)}") + sftp_client.put(local, os.path.join(target, truename)) data = extract() files = transform(data) diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index 0c36e660776c05ca81e9313cfbfaf8525f3ba562..727dfc23558a4f4e96015de180e221ac4c10baee 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -44,7 +44,7 @@ version: '2.4' x-airflow-common: &airflow-common - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2} + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3} environment: &airflow-common-env AIRFLOW_HOME: /opt/airflow diff --git a/docs/apirequests.adoc b/docs/apirequests.adoc index 329e5d14166233a0b9b4b261e0e54b9e806e958c..5dd6c1b87d1db58be77d6dd05363778cad5d1561 100644 --- a/docs/apirequests.adoc +++ b/docs/apirequests.adoc @@ -28,13 +28,19 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \ $DLS/connections ---- +or for key-based access: +---- +curl -X POST -u USER:PASS -H "Content-Type: application/json" --data '{"connection_id": "default_ssh", "conn_type": "ssh", "host": "SSH_HOST", "login": "LOGIN", "port": PORT, "extra": "{\"private_key\": \"-----BEGIN OPENSSH PRIVATE KEY-----\\nb3BlbnNzaC1rZXktdjgEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAlwAAAAdzc2gtcn\\nNhAAAAAwEAAQAAAIEAv1w/pNTlbh8Kvmu3+NMt5Kp7yT2FxplbPHi7sQEG80tPNNLb1oDa\\n1JaqKIN1Jq+U1895TTRr1nZBz6rKubssjdQ5H3AcO5ZbNRfiE4tGYbqHPAmpi4kTumQpy5\\nf0JkCHBNuK2LAEOV6pg1jukSmI34Z/HSoBpCCqmDPTQdT9Xq0AAAIIJKwpKCSsKSgAAAAH\\nc3NoLXJzYQAAAIEAv1w/pNTlbh8Kvmu3+NMt5Kp7yT2FxplbPHi7sQEG80tPNNLb1oDa1J\\naqKIN1Jq+U1895TTRr1nZBz6rKubssjdQ5H3AcO5ZbNRfiE4tGYbqHPAmpi4kTumQpy5f0\\nJkCHBNuK2LAEOV6pg1jukSmI34Z/HSoBpCCqmDPTQdT9Xq0AAAADAQABAAAAgEAFdu2IpL\\nGxBQEsPMKstH/6Yau0P5twF0wmAHV5qH+hRIChwxcCyTOyrH8dbAZjW+LP8P9ZeHKp4d6+\\nf1CgRIkhrKj2IYqXgIRUnbH3POBPzmxcEXSYrzc9zOriMhEEdsUraR0C20eFxShyVRHQRv\\nYjnvbYdcZjVnP09TLndZRpAAAAQAXtoENeyFzGxTpNlwqoggeeSvvXTIq8EiLFT8tdF2Lc\\nCXv/6VSDo53f3NmnC45sCNX3/vUq8Hqdu4SHm4y1EGEAAABBAPpNPhi2OvnN6wCiuRMbx1\\nD/nXdtI9LnPwwUmYcKZ+TDIx3mqpyZEJogIKA6gUlG1h1L1VUrtkr/e5XJGLP33ksAAABB\\nAMO3fvQIbOcNiVwKPu1nQG6KrS7y0Uf1O8sb+5kQMGBkJVcyLJTr3fYafOd7Sxo66OYv0b\\nQ649nEtohPPD75u+cAAAARcm9vdEBiY2JiZjhhZDdhMjQBAg==\\n-----END OPENSSH PRIVATE KEY-----\"}"}' \ + $DLS/connections +---- + === Starting data transfer === To start a transfer following request needs to be sent it includes B2SHARE object id as a parameter. For testing purposes one can use +b38609df2b334ea296ea1857e568dbea+ which -includes one 100MB file. +includes one 100MB file. The target parameter is optional with default value +/tmp/+. ---- curl -X POST -u USER:PASS -H "Content-Type: application/json" \ - --data '{"conf": {"oid":ID}}' \ + --data '{"conf": {"oid": ID}, "target": PATH}' \ $DLS/dags/taskflow_example/dagRuns ----