Skip to content
Snippets Groups Projects
Commit f5100d28 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

optional registration in upload

parent 68360d60
No related branches found
No related tags found
No related merge requests found
Pipeline #96724 passed
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.operators.bash import BashOperator
from airflow.providers.http.hooks.http import HttpHook
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
def get_record(name, url):
return {
"name": name,
"url": url,
"metadata": {
"author": "DLS on behalf of eFlows",
}
}
def get_parameter(parameter, default=False, **kwargs):
params = kwargs['params']
return params.get(parameter, default)
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def datacat_registration_example():
@task()
def register(object_url, **kwargs):
reg = get_parameter(parameter='register', default=False, **kwargs)
if not reg:
print("Skipping registration as 'register' parameter is not set")
return 0
connection = Connection.get_connection_from_secrets(
'datacat_connection')
server = connection.get_uri()
print(f"Registring\n\t{object_url}\n with\n\t {server}")
# auth_type empty to overwrite http basic auth
hook = HttpHook(http_conn_id='datacat_connection', auth_type=lambda x, y: None)
res = hook.run(endpoint='token',
data={'username': connection.login, 'password': connection.password}
)
if res.status_code != 200:
print("Unable to authenticate. Breaking. Check `datacat_conneciton` for creds")
return -1
token = res.json()['access_token']
auth_header = {'Authorization': f"Bearer {token}"}
r = hook.run(endpoint='dataset', headers=auth_header,
json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url)
)
if r.status_code==200:
d_id = r.json()[0]
print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}")
return d_id
print(f"Registraton failed: {r.text}")
return -1
step1 = BashOperator(bash_command='ls', task_id='nothing')
step2 = register(
object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7')
step1 >> step2
dag = datacat_registration_example()
......@@ -3,15 +3,16 @@ import os
import tempfile
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.operators.python import PythonOperator
from airflow.providers.http.hooks.http import HttpHook
from airflow.utils.dates import days_ago
from airflow.models import Variable
from b2shareoperator import (add_file, create_draft_record, get_community,
submit_draft)
from decors import remove, setup, get_connection
from decors import get_connection, remove, setup
from just_reg import get_parameter, get_record
default_args = {
'owner': 'airflow',
......@@ -99,9 +100,44 @@ def upload_example():
print("Submitting record for pubication")
submitted = submit_draft(record=r, token=token)
print(f"Record created {submitted['id']}")
print(f"Record created {submitted}")
return submitted['links']['self']
@task()
def register(object_url, **kwargs):
reg = get_parameter(parameter='register', default=False, **kwargs)
if not reg:
print("Skipping registration as 'register' parameter is not set")
return 0
connection = Connection.get_connection_from_secrets(
'datacat')
server = connection.get_uri()
print(f"Registring\n\t{object_url}\n with\n\t {server}")
# auth_type empty to overwrite http basic auth
hook = HttpHook(http_conn_id='datacat', auth_type=lambda x, y: None)
res = hook.run(endpoint='token',
data={'username': connection.login, 'password': connection.password}
)
if res.status_code != 200:
print("Unable to authenticate. Breaking. Check `datacat` for creds")
return -1
return submitted['id']
token = res.json()['access_token']
auth_header = {'Authorization': f"Bearer {token}"}
r = hook.run(endpoint='dataset', headers=auth_header,
json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url)
)
if r.status_code==200:
d_id = r.json()[0]
print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}")
return d_id
print(f"Registraton failed: {r.text}")
return -1
......@@ -114,6 +150,8 @@ def upload_example():
en = PythonOperator(python_callable=remove, op_kwargs={
'conn_id': a_id}, task_id='cleanup')
reg = register(object_url=uid)
setup_task >> files >> uid >> en
......
......@@ -111,6 +111,8 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \
$DLS/dags/upload_example/dagRuns
----
Optionally, the record created in b2share can be registered with data cat. This is achieved by providing 'register' parameter set to 'true'. This operation requires connection to data cat with credentials and named 'datacat'.
=== Image transfer example ===
To transfer images from eFlows4HPC image build service use dag defined in +dags/image_transfer.py+ (transfer_image). It requires two parameters +image_id+ name of the image in the image
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment