diff --git a/dags/just_reg.py b/dags/just_reg.py new file mode 100644 index 0000000000000000000000000000000000000000..ac7f33cf9e88f71cd3b720df2eb1ea9aced5dd5d --- /dev/null +++ b/dags/just_reg.py @@ -0,0 +1,73 @@ +from airflow.decorators import dag, task +from airflow.models.connection import Connection +from airflow.operators.bash import BashOperator +from airflow.providers.http.hooks.http import HttpHook +from airflow.utils.dates import days_ago + + +default_args = { + 'owner': 'airflow', +} + + +def get_record(name, url): + return { + "name": name, + "url": url, + "metadata": { + "author": "DLS on behalf of eFlows", + } + } + +def get_parameter(parameter, default=False, **kwargs): + params = kwargs['params'] + return params.get(parameter, default) + +@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) +def datacat_registration_example(): + + @task() + def register(object_url, **kwargs): + reg = get_parameter(parameter='register', default=False, **kwargs) + if not reg: + print("Skipping registration as 'register' parameter is not set") + return 0 + + connection = Connection.get_connection_from_secrets( + 'datacat_connection') + server = connection.get_uri() + print(f"Registring\n\t{object_url}\n with\n\t {server}") + + # auth_type empty to overwrite http basic auth + hook = HttpHook(http_conn_id='datacat_connection', auth_type=lambda x, y: None) + res = hook.run(endpoint='token', + data={'username': connection.login, 'password': connection.password} + ) + + if res.status_code != 200: + print("Unable to authenticate. Breaking. Check `datacat_conneciton` for creds") + return -1 + + token = res.json()['access_token'] + auth_header = {'Authorization': f"Bearer {token}"} + + r = hook.run(endpoint='dataset', headers=auth_header, + json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url) + ) + if r.status_code==200: + d_id = r.json()[0] + print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}") + return d_id + print(f"Registraton failed: {r.text}") + return -1 + + + + step1 = BashOperator(bash_command='ls', task_id='nothing') + step2 = register( + object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7') + + step1 >> step2 + + +dag = datacat_registration_example() diff --git a/dags/uploadflow.py b/dags/uploadflow.py index ff70aff2ab6e509fca0baca75d9c8ef55d26ec17..1ae7b04f566cbb1dd55921e2dfebbfbb2bf3216f 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -3,15 +3,16 @@ import os import tempfile from airflow.decorators import dag, task +from airflow.models import Variable from airflow.models.connection import Connection from airflow.operators.python import PythonOperator from airflow.providers.http.hooks.http import HttpHook from airflow.utils.dates import days_ago -from airflow.models import Variable from b2shareoperator import (add_file, create_draft_record, get_community, submit_draft) -from decors import remove, setup, get_connection +from decors import get_connection, remove, setup +from just_reg import get_parameter, get_record default_args = { 'owner': 'airflow', @@ -99,9 +100,44 @@ def upload_example(): print("Submitting record for pubication") submitted = submit_draft(record=r, token=token) - print(f"Record created {submitted['id']}") + print(f"Record created {submitted}") + + return submitted['links']['self'] + + @task() + def register(object_url, **kwargs): + reg = get_parameter(parameter='register', default=False, **kwargs) + if not reg: + print("Skipping registration as 'register' parameter is not set") + return 0 + + connection = Connection.get_connection_from_secrets( + 'datacat') + server = connection.get_uri() + print(f"Registring\n\t{object_url}\n with\n\t {server}") + + # auth_type empty to overwrite http basic auth + hook = HttpHook(http_conn_id='datacat', auth_type=lambda x, y: None) + res = hook.run(endpoint='token', + data={'username': connection.login, 'password': connection.password} + ) + + if res.status_code != 200: + print("Unable to authenticate. Breaking. Check `datacat` for creds") + return -1 - return submitted['id'] + token = res.json()['access_token'] + auth_header = {'Authorization': f"Bearer {token}"} + + r = hook.run(endpoint='dataset', headers=auth_header, + json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url) + ) + if r.status_code==200: + d_id = r.json()[0] + print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}") + return d_id + print(f"Registraton failed: {r.text}") + return -1 @@ -114,6 +150,8 @@ def upload_example(): en = PythonOperator(python_callable=remove, op_kwargs={ 'conn_id': a_id}, task_id='cleanup') + reg = register(object_url=uid) + setup_task >> files >> uid >> en diff --git a/docs/apirequests.adoc b/docs/apirequests.adoc index 159bd412f29aee8d2fde4b619da569abb068c5bc..d3276c76bb3e9c721bc085b9d5e6c720d371131e 100644 --- a/docs/apirequests.adoc +++ b/docs/apirequests.adoc @@ -111,6 +111,8 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \ $DLS/dags/upload_example/dagRuns ---- +Optionally, the record created in b2share can be registered with data cat. This is achieved by providing 'register' parameter set to 'true'. This operation requires connection to data cat with credentials and named 'datacat'. + === Image transfer example === To transfer images from eFlows4HPC image build service use dag defined in +dags/image_transfer.py+ (transfer_image). It requires two parameters +image_id+ name of the image in the image