diff --git a/dags/just_reg.py b/dags/just_reg.py index ac7f33cf9e88f71cd3b720df2eb1ea9aced5dd5d..8d213b6e8f56ae673fc2de6ee88721a5a248da67 100644 --- a/dags/just_reg.py +++ b/dags/just_reg.py @@ -3,6 +3,8 @@ from airflow.models.connection import Connection from airflow.operators.bash import BashOperator from airflow.providers.http.hooks.http import HttpHook from airflow.utils.dates import days_ago +from datacat_integration.hooks import DataCatalogHook +from datacat_integration.connection import DataCatalogEntry default_args = { @@ -33,33 +35,32 @@ def datacat_registration_example(): print("Skipping registration as 'register' parameter is not set") return 0 - connection = Connection.get_connection_from_secrets( - 'datacat_connection') - server = connection.get_uri() - print(f"Registring\n\t{object_url}\n with\n\t {server}") - - # auth_type empty to overwrite http basic auth - hook = HttpHook(http_conn_id='datacat_connection', auth_type=lambda x, y: None) - res = hook.run(endpoint='token', - data={'username': connection.login, 'password': connection.password} - ) - - if res.status_code != 200: - print("Unable to authenticate. Breaking. Check `datacat_conneciton` for creds") + hook = DataCatalogHook() + print("Connected to datacat via hook", hook.list_type('dataset')) + + entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}", + url=object_url, + metadata= { + "author": "DLS on behalf of eFlows", + "access": "hook-based"} + ) + try: + r = hook.create_entry(datacat_type='dataset', entry=entry) + print("Hook registration returned: ", r) + return r + except e: + print('Registration failed', e) return -1 - token = res.json()['access_token'] - auth_header = {'Authorization': f"Bearer {token}"} - - r = hook.run(endpoint='dataset', headers=auth_header, - json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url) - ) - if r.status_code==200: - d_id = r.json()[0] - print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}") - return d_id - print(f"Registraton failed: {r.text}") - return -1 + #r = hook.run(endpoint='dataset', headers=auth_header, + # json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url) + # ) + #if r.status_code==200: + # d_id = r.json()[0] + # print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}") + # return d_id + #print(f"Registraton failed: {r.text}") + #return -1