diff --git a/dags/taskflow.py b/dags/taskflow.py index f00db9cdd2f990af38354376f9fb38625d28992c..bb4abbf8eefe8041d6321434635688c91af74fa3 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -5,10 +5,12 @@ from airflow.models.connection import Connection from airflow.models import Variable from airflow.utils.dates import days_ago import os +from datacat_integration.hooks import DataCatalogHook +import json + from decors import get_connection, remove, setup -from b2shareoperator import (download_file, get_file_list, get_object_md, - get_objects) +from b2shareoperator import (download_file, get_file_list, get_object_md) default_args = { 'owner': 'airflow', @@ -20,22 +22,34 @@ def taskflow_example(): @task(multiple_outputs=True) def extract(conn_id, **kwargs): - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - params = kwargs['params'] - if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"} - print("Missing object id in pipeline parameters") - lst = get_objects(server=server) - flist = {o['id']: [f['key'] for f in o['files']] for o in lst} - print(f"Objects on server: {flist}") - return -1 # non zero exit code is a task failure - + if 'oid' not in params: + print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id") + return -1 oid = params['oid'] + hook = DataCatalogHook() + try: + entry = json.loads(hook.get_entry('dataset', oid)) + if entry and 'b2share' in entry['url']: + print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}") + oid = entry['url'].split('/')[-1] + print(f"Extracted oid {oid}") + else: + print('No entry in data cat or not a b2share entry') + + except: + # falling back to b2share + print("No entry found. Probably a b2share object") + + + connection = Connection.get_connection_from_secrets('default_b2share') + server = connection.get_uri() + print(f"Rereiving data from {server}") + obj = get_object_md(server=server, oid=oid) print(f"Retrieved object {oid}: {obj}") + # check status? flist = get_file_list(obj) return flist diff --git a/dags/taskflow_stream.py b/dags/taskflow_stream.py index 84a4e2caae36c777c1e8c21c3688383cbc9eb7c0..2412e625f09ad1d078ae38349121c26e942dc4c4 100644 --- a/dags/taskflow_stream.py +++ b/dags/taskflow_stream.py @@ -1,3 +1,4 @@ +from importlib.metadata import metadata import os import shutil import requests @@ -6,10 +7,12 @@ from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago +from datacat_integration.hooks import DataCatalogHook +import json -from b2shareoperator import (get_file_list, get_object_md, - get_objects) + +from b2shareoperator import (get_file_list, get_object_md) default_args = { 'owner': 'airflow', @@ -20,20 +23,32 @@ default_args = { def taskflow_stream(): @task(multiple_outputs=True) def get_flist(**kwargs): - connection = Connection.get_connection_from_secrets('default_b2share') - server = connection.get_uri() - print(f"Rereiving data from {server}") - params = kwargs['params'] - if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"} - print("Missing object id in pipeline parameters") - lst = get_objects(server=server) - flist = {o['id']: [f['key'] for f in o['files']] for o in lst} - print(f"Objects on server: {flist}") + if 'oid' not in params: + print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id") return -1 - oid = params['oid'] + hook = DataCatalogHook() + try: + entry = json.loads(hook.get_entry('dataset', oid)) + if entry and 'b2share' in entry['url']: + print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}") + oid = entry['url'].split('/')[-1] + print(f"Extracted oid {oid}") + else: + print('No entry in data cat or not a b2share entry') + + except: + # falling back to b2share + print("No entry found. Probably a b2share object") + + + connection = Connection.get_connection_from_secrets('default_b2share') + server = connection.get_uri() + print(f"Rereiving data from {server}") + + obj = get_object_md(server=server, oid=oid) print(f"Retrieved object {oid}: {obj}") flist = get_file_list(obj)