diff --git a/README.md b/README.md index 323709ec3608f63bfbb93fdaef1312529e9450ad..fe0d6923d7b0c9dc2cdc42cff5c23dfa6559d778 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,19 @@ docker-compose -f dockers/docker-compose.yaml --project-directory . up -d ``` ## Setup connection + +### B2Share connection +Here we use testing instance (check hostname) + ``` -curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' localhost:7001/api/v1/connections +curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' airflow:7001/api/v1/connections ``` +### SSH +Copy to target goes through scp (example with username/pass) + +``` +curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_ssh", "conn_type": "ssh", "host": "ssh", "login": "user", "port": 2222, "password": "pass"}' airflow:7001/api/v1/connections +``` + + diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index f9a4cb3fd36031a3d6313fb875c8373756c6719f..f56f8492bc21ed1b8f177b41c11787fc401fcb3c 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -20,33 +20,51 @@ def get_object_md(server, oid): return obj def download_file(url: str, target_dir: str): - fname = tempfile.mktemp(dir=target_dir) urllib.request.urlretrieve(url=url, filename=fname) return fname -server='https://b2share-testing.fz-juelich.de/' class B2ShareOperator(BaseOperator): + template_fields = ('target_dir',) def __init__( self, name: str, - conn_id: str = 'default_b2share', + conn_id: str = 'default_b2share', # 'https://b2share-testing.fz-juelich.de/', + target_dir: str = '/tmp/', **kwargs) -> None: super().__init__(**kwargs) self.name = name self.conn_id = conn_id + self.target_dir = target_dir + print(self.target_dir) + + def execute(self, **kwargs): + connection = Connection.get_connection_from_secrets('default_b2share') + server = connection.get_uri() + print(f"Rereiving data from {server}") + + print('Kwargs') print(kwargs) - def execute(self, context): + params = kwargs['context']['params'] + oid = params['oid'] + obj = get_object_md(server=server, oid=oid) + print(f"Retrieved object {oid}: {obj}") + flist = get_file_list(obj) + + ti = kwargs['context']['ti'] + name_mappings = {} + for fname, url in flist.items(): + tmpname = download_file(url=url, target_dir=self.target_dir) + print(f"Processing: {fname} --> {url} --> {tmpname}") - connection = Connection.get_connection_from_secrets(self.conn_id) - print(f"Rereiving data from {connection.get_uri()}") + name_mappings[fname]=tmpname + ti.xcom_push(key='local', value=tmpname) + ti.xcom_push(key='remote', value=fname) + break # for now only one file - lst = get_objects(server=connection.get_uri()) - flist = {o['id']: [f['key'] for f in o['files']] for o in lst} - print(f"GOT: {flist}") - print(self.params) - return len(flist) + + return len(name_mappings) diff --git a/dags/firsto.py b/dags/firsto.py index eb2a6b1e3c7ee8d855ab77c83c0bdd21c95d8c26..abe85fb97f61ba67ca10f6b2fe137eb5db352629 100644 --- a/dags/firsto.py +++ b/dags/firsto.py @@ -5,7 +5,7 @@ from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.bash import BashOperator - +from airflow.providers.sftp.operators.sftp import SFTPOperator from b2shareoperator import B2ShareOperator def_args = { @@ -19,10 +19,20 @@ def_args = { } with DAG('firsto', default_args=def_args, description='first dag', schedule_interval=timedelta(days=1), start_date=days_ago(2)) as dag: - t1 = BashOperator(task_id='print_date', bash_command='date') - t2 = BashOperator(task_id='do_noting', bash_command='sleep 5') - - t3 = B2ShareOperator(task_id='task_b2sh', dag=dag, name='B2Share') - t1 >> t2 >> t3 + get_b2obj = B2ShareOperator(task_id='task_b2sh', + dag=dag, + name='B2Share', + target_dir="{{ var.value.source_path}}") + + put_file = SFTPOperator( + task_id="upload_scp", + ssh_conn_id="default_ssh", + local_filepath="{{ti.xcom_pull(task_ids='task_b2sh', key='local')}}", + remote_filepath="{{ti.xcom_pull(task_ids='task_b2sh',key='remote')}}", + operation="put", + create_intermediate_dirs=True, + dag=dag) + + get_b2obj >> put_file diff --git a/dags/taskflow.py b/dags/taskflow.py index e45ed47a3e2b830c426f1b27e02b3b2e9ce2c15b..9b7b4deedb2e2134bf49c1c2c8f6bf25125b8188 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -23,7 +23,7 @@ def taskflow_example(**kwargs): print(f"Rereiving data from {server}") params = kwargs['params'] - if 'oid' not in params: + if 'oid' not in params: #{"oid":"b38609df2b334ea296ea1857e568dbea"} print(f"Missing object id in pipeline parameters") lst = get_objects(server=server) flist = {o['id']: [f['key'] for f in o['files']] for o in lst} @@ -52,8 +52,8 @@ def taskflow_example(**kwargs): ssh_hook = SSHHook(ssh_conn_id='default_ssh') with ssh_hook.get_conn() as ssh_client: sftp_client = ssh_client.open_sftp() - for [local, remote] in files.items(): - sftp_client.put(local, f"/tmp/{remote}") + for [truename, local] in files.items(): + sftp_client.put(local, f"/tmp/{truename}") diff --git a/tests/test_dag.py b/tests/test_dag.py index 8a1faa1b7b3deb63adcb7bb56a2342abd72adcdd..0f26de3d2d68de22f19cfd6512fab18b53864158 100644 --- a/tests/test_dag.py +++ b/tests/test_dag.py @@ -10,4 +10,4 @@ class TestADag(unittest.TestCase): dag = self.dagbag.get_dag(dag_id='firsto') assert self.dagbag.import_errors == {} assert dag is not None - self.assertEqual(len(dag.tasks), 3, f"Actually: {len(dag.tasks)}") \ No newline at end of file + self.assertEqual(len(dag.tasks), 2, f"Actually: {len(dag.tasks)}") \ No newline at end of file