Skip to content
Snippets Groups Projects
Commit 10d7f8d6 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

registration with datacat hook

parent f0e4fad9
No related branches found
No related tags found
No related merge requests found
Pipeline #97025 passed
from urllib.parse import urljoin
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.operators.bash import BashOperator
......@@ -46,22 +47,12 @@ def datacat_registration_example():
)
try:
r = hook.create_entry(datacat_type='dataset', entry=entry)
print("Hook registration returned: ", r)
print("Hook registration returned: ", r, urljoin(hook.connection.url, r))
return r
except e:
except ConnectionError as e:
print('Registration failed', e)
return -1
#r = hook.run(endpoint='dataset', headers=auth_header,
# json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url)
# )
#if r.status_code==200:
# d_id = r.json()[0]
# print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}")
# return d_id
#print(f"Registraton failed: {r.text}")
#return -1
step1 = BashOperator(bash_command='ls', task_id='nothing')
......
import os
import tempfile
from urllib.parse import urljoin
from airflow.decorators import dag, task
from airflow.models import Variable
......@@ -13,6 +14,8 @@ from b2shareoperator import (add_file, create_draft_record, get_community,
submit_draft)
from decors import get_connection, remove, setup
from just_reg import get_parameter, get_record
from datacat_integration.hooks import DataCatalogHook
from datacat_integration.connection import DataCatalogEntry
default_args = {
'owner': 'airflow',
......@@ -94,6 +97,7 @@ def upload_example():
print(f"Draft record created {r['id']} --> {r['links']['self']}")
else:
print('Something went wrong with registration', r, r.text)
return -1
for [local, true_name] in files.items():
print(f"Uploading {local} --> {true_name}")
......@@ -114,36 +118,23 @@ def upload_example():
print("Skipping registration as 'register' parameter is not set")
return 0
connection = Connection.get_connection_from_secrets(
'datacat')
server = connection.get_uri()
print(f"Registring\n\t{object_url}\n with\n\t {server}")
# auth_type empty to overwrite http basic auth
hook = HttpHook(http_conn_id='datacat', auth_type=lambda x, y: None)
res = hook.run(endpoint='token',
data={'username': connection.login, 'password': connection.password}
)
if res.status_code != 200:
print("Unable to authenticate. Breaking. Check `datacat` for creds")
hook = DataCatalogHook()
print("Connected to datacat via hook", hook.list_type('dataset'))
entry = DataCatalogEntry(name=f"DLS results {kwargs['run_id']}",
url=object_url,
metadata= {
"author": "DLS on behalf of eFlows",
"access": "hook-based"}
)
try:
r = hook.create_entry(datacat_type='dataset', entry=entry)
print("Hook registration returned: ", r, urljoin(hook.connection.url, r))
return r
except ConnectionError as e:
print('Registration failed', e)
return -1
token = res.json()['access_token']
auth_header = {'Authorization': f"Bearer {token}"}
r = hook.run(endpoint='dataset', headers=auth_header,
json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url)
)
if r.status_code==200:
d_id = r.json()[0]
print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}")
return d_id
print(f"Registraton failed: {r.text}")
return -1
setup_task = PythonOperator(python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment