from airflow.decorators import dag, task from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from datacat_integration.hooks import DataCatalogHook from datacat_integration.connection import DataCatalogEntry import json default_args = { 'owner': 'airflow', } def get_record(name, url): return { "name": name, "url": url, "metadata": { "author": "DLS on behalf of eFlows", } } def create_template(hrespo): return { "titles": [{"title": hrespo['title']}], "creators": [{"creator_name": hrespo['creator_name']}], "descriptions": [ { "description": hrespo['description'], "description_type": "Abstract" } ], "community": "2d58eb08-af65-4cad-bd25-92f1a17d325b", "community_specific": { "90942261-4637-4ac0-97b8-12e1edb38739": {"helmholtz centre": ["Forschungszentrum Jülich"]} }, "open_access": hrespo['open_access'] == "True" } def get_parameter(parameter, default=False, **kwargs): params = kwargs['params'] return params.get(parameter, default) @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def datacat_registration_example(): @task() def register(object_url, **kwargs): reg = get_parameter(parameter='register', default=False, **kwargs) if not reg: print("Skipping registration as 'register' parameter is not set") return 0 hook = DataCatalogHook() print("Connected to datacat via hook", hook.list_type('dataset')) entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", url=object_url, metadata= { "author": "DLS on behalf of eFlows", "access": "hook-based"} ) try: r = hook.create_entry(datacat_type='dataset', entry=entry) print("Hook registration returned: ", r) return f"{hook.base_url}/dataset/{r}" except ConnectionError as e: print('Registration failed', e) return -1 @task def get_template(): hook = DataCatalogHook() print("Connected to datacat via hook", hook.list_type('dataset')) mid = '71e863ac-aee6-4680-a57c-de318530b71e' entry = json.loads(hook.get_entry(datacat_type='storage_target', oid=mid)) print(entry) print(entry['metadata']) print('---') print(create_template(entry['metadata'])) step1 = BashOperator(task_id='jj', bash_command='ls') #BashOperator(bash_command='curl -X GET -k https://bscgrid20.bsc.es/image_creation/images/download/wordcount_skylake.sif -o /work/ww', task_id='nothing') step2 = register( object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7') step3 = get_template() stepr = BashOperator(bash_command='rm /work/ww', task_id='remov') step1 >> step2 >> step3 >> stepr dag = datacat_registration_example()