diff --git a/dags/just_reg.py b/dags/just_reg.py index 8d213b6e8f56ae673fc2de6ee88721a5a248da67..7a9bf5cb4eba0d7cfd30f17ebc6eb21f2e1a8ce1 100644 --- a/dags/just_reg.py +++ b/dags/just_reg.py @@ -1,3 +1,4 @@ +from urllib.parse import urljoin from airflow.decorators import dag, task from airflow.models.connection import Connection from airflow.operators.bash import BashOperator @@ -46,22 +47,12 @@ def datacat_registration_example(): ) try: r = hook.create_entry(datacat_type='dataset', entry=entry) - print("Hook registration returned: ", r) + print("Hook registration returned: ", r, urljoin(hook.connection.url, r)) return r - except e: + except ConnectionError as e: print('Registration failed', e) return -1 - #r = hook.run(endpoint='dataset', headers=auth_header, - # json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url) - # ) - #if r.status_code==200: - # d_id = r.json()[0] - # print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}") - # return d_id - #print(f"Registraton failed: {r.text}") - #return -1 - step1 = BashOperator(bash_command='ls', task_id='nothing') diff --git a/dags/uploadflow.py b/dags/uploadflow.py index d9b405d36ca45206e3bde9d3a4fb6e55728ab4fb..18c504317ab9ea334934e8be354b55620e793615 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -1,6 +1,7 @@ import os import tempfile +from urllib.parse import urljoin from airflow.decorators import dag, task from airflow.models import Variable @@ -13,6 +14,8 @@ from b2shareoperator import (add_file, create_draft_record, get_community, submit_draft) from decors import get_connection, remove, setup from just_reg import get_parameter, get_record +from datacat_integration.hooks import DataCatalogHook +from datacat_integration.connection import DataCatalogEntry default_args = { 'owner': 'airflow', @@ -94,6 +97,7 @@ def upload_example(): print(f"Draft record created {r['id']} --> {r['links']['self']}") else: print('Something went wrong with registration', r, r.text) + return -1 for [local, true_name] in files.items(): print(f"Uploading {local} --> {true_name}") @@ -114,36 +118,23 @@ def upload_example(): print("Skipping registration as 'register' parameter is not set") return 0 - connection = Connection.get_connection_from_secrets( - 'datacat') - server = connection.get_uri() - print(f"Registring\n\t{object_url}\n with\n\t {server}") - - # auth_type empty to overwrite http basic auth - hook = HttpHook(http_conn_id='datacat', auth_type=lambda x, y: None) - res = hook.run(endpoint='token', - data={'username': connection.login, 'password': connection.password} - ) - - if res.status_code != 200: - print("Unable to authenticate. Breaking. Check `datacat` for creds") + hook = DataCatalogHook() + print("Connected to datacat via hook", hook.list_type('dataset')) + + entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", + url=object_url, + metadata= { + "author": "DLS on behalf of eFlows", + "access": "hook-based"} + ) + try: + r = hook.create_entry(datacat_type='dataset', entry=entry) + print("Hook registration returned: ", r, urljoin(hook.connection.url, r)) + return r + except ConnectionError as e: + print('Registration failed', e) return -1 - token = res.json()['access_token'] - auth_header = {'Authorization': f"Bearer {token}"} - - r = hook.run(endpoint='dataset', headers=auth_header, - json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url) - ) - if r.status_code==200: - d_id = r.json()[0] - print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}") - return d_id - print(f"Registraton failed: {r.text}") - return -1 - - - setup_task = PythonOperator(python_callable=setup, task_id='setup_connection') a_id = setup_task.output['return_value']