From d582f3a30f0f830f741e91084b227d9c39de0639 Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Mon, 13 Sep 2021 09:15:23 +0200 Subject: [PATCH] b2share operator using more airflowish hook --- dags/b2shareoperator.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index f56f849..cdc40e4 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -1,5 +1,6 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection +from airflow.providers.http.hooks.http import HttpHook import requests from urllib.parse import urljoin import tempfile @@ -42,19 +43,15 @@ class B2ShareOperator(BaseOperator): print(self.target_dir) def execute(self, **kwargs): - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - - print('Kwargs') - print(kwargs) + hook = HttpHook(http_conn_id=self.conn_id, method='GET') params = kwargs['context']['params'] oid = params['oid'] - obj = get_object_md(server=server, oid=oid) - print(f"Retrieved object {oid}: {obj}") - flist = get_file_list(obj) + hrespo = hook.run(endpoint=f"/api/records/{oid}") + print(hrespo) + flist = get_file_list(hrespo.json()) + print(flist) ti = kwargs['context']['ti'] name_mappings = {} for fname, url in flist.items(): @@ -66,5 +63,5 @@ class B2ShareOperator(BaseOperator): ti.xcom_push(key='remote', value=fname) break # for now only one file - + ti.xcom_push(key='mappins', value=name_mappings) return len(name_mappings) -- GitLab