From f5100d282559d7b1d8b093fac521cc39091a058b Mon Sep 17 00:00:00 2001
From: jrybicki-jsc <j.rybicki@fz-juelich.de>
Date: Fri, 1 Apr 2022 10:27:22 +0200
Subject: [PATCH] optional registration in upload

---
 dags/just_reg.py      | 73 +++++++++++++++++++++++++++++++++++++++++++
 dags/uploadflow.py    | 46 ++++++++++++++++++++++++---
 docs/apirequests.adoc |  2 ++
 3 files changed, 117 insertions(+), 4 deletions(-)
 create mode 100644 dags/just_reg.py

diff --git a/dags/just_reg.py b/dags/just_reg.py
new file mode 100644
index 0000000..ac7f33c
--- /dev/null
+++ b/dags/just_reg.py
@@ -0,0 +1,73 @@
+from airflow.decorators import dag, task
+from airflow.models.connection import Connection
+from airflow.operators.bash import BashOperator
+from airflow.providers.http.hooks.http import HttpHook
+from airflow.utils.dates import days_ago
+
+
+default_args = {
+    'owner': 'airflow',
+}
+
+
+def get_record(name, url):
+    return {
+        "name": name,
+        "url": url,
+        "metadata": {
+            "author": "DLS on behalf of eFlows",
+        }
+    }
+
+def get_parameter(parameter, default=False, **kwargs):
+    params = kwargs['params']
+    return params.get(parameter, default)
+
+@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
+def datacat_registration_example():
+
+    @task()
+    def register(object_url, **kwargs):
+        reg = get_parameter(parameter='register', default=False, **kwargs)
+        if not reg:
+            print("Skipping registration as 'register' parameter is not set")
+            return 0
+
+        connection = Connection.get_connection_from_secrets(
+            'datacat_connection')
+        server = connection.get_uri()
+        print(f"Registring\n\t{object_url}\n with\n\t {server}")
+
+        # auth_type empty to overwrite http basic auth
+        hook = HttpHook(http_conn_id='datacat_connection', auth_type=lambda x, y: None)
+        res = hook.run(endpoint='token',
+                       data={'username': connection.login, 'password': connection.password}
+                       )
+
+        if res.status_code != 200:
+            print("Unable to authenticate. Breaking. Check `datacat_conneciton` for creds")
+            return -1
+
+        token = res.json()['access_token']
+        auth_header = {'Authorization': f"Bearer {token}"}
+
+        r = hook.run(endpoint='dataset', headers=auth_header,
+                    json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url)
+                    )
+        if r.status_code==200:
+            d_id = r.json()[0]
+            print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}")
+            return d_id
+        print(f"Registraton failed: {r.text}")
+        return -1
+
+
+
+    step1 = BashOperator(bash_command='ls', task_id='nothing')
+    step2 = register(
+        object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7')
+
+    step1 >> step2
+
+
+dag = datacat_registration_example()
diff --git a/dags/uploadflow.py b/dags/uploadflow.py
index ff70aff..1ae7b04 100644
--- a/dags/uploadflow.py
+++ b/dags/uploadflow.py
@@ -3,15 +3,16 @@ import os
 import tempfile
 
 from airflow.decorators import dag, task
+from airflow.models import Variable
 from airflow.models.connection import Connection
 from airflow.operators.python import PythonOperator
 from airflow.providers.http.hooks.http import HttpHook
 from airflow.utils.dates import days_ago
-from airflow.models import Variable
 
 from b2shareoperator import (add_file, create_draft_record, get_community,
                              submit_draft)
-from decors import remove, setup, get_connection
+from decors import get_connection, remove, setup
+from just_reg import get_parameter, get_record
 
 default_args = {
     'owner': 'airflow',
@@ -99,9 +100,44 @@ def upload_example():
 
         print("Submitting record for pubication")
         submitted = submit_draft(record=r, token=token)
-        print(f"Record created {submitted['id']}")
+        print(f"Record created {submitted}")
+
+        return submitted['links']['self']
+
+    @task()
+    def register(object_url, **kwargs):
+        reg = get_parameter(parameter='register', default=False, **kwargs)
+        if not reg:
+            print("Skipping registration as 'register' parameter is not set")
+            return 0
+
+        connection = Connection.get_connection_from_secrets(
+            'datacat')
+        server = connection.get_uri()
+        print(f"Registring\n\t{object_url}\n with\n\t {server}")
+
+        # auth_type empty to overwrite http basic auth
+        hook = HttpHook(http_conn_id='datacat', auth_type=lambda x, y: None)
+        res = hook.run(endpoint='token',
+                       data={'username': connection.login, 'password': connection.password}
+                       )
+
+        if res.status_code != 200:
+            print("Unable to authenticate. Breaking. Check `datacat` for creds")
+            return -1
 
-        return submitted['id']
+        token = res.json()['access_token']
+        auth_header = {'Authorization': f"Bearer {token}"}
+
+        r = hook.run(endpoint='dataset', headers=auth_header,
+                    json=get_record(name=f"DLS results {kwargs['run_id']}", url=object_url)
+                    )
+        if r.status_code==200:
+            d_id = r.json()[0]
+            print(f"Registered sucesfully: {hook.base_url}/dataset/{d_id}")
+            return d_id
+        print(f"Registraton failed: {r.text}")
+        return -1
 
 
 
@@ -114,6 +150,8 @@ def upload_example():
     en = PythonOperator(python_callable=remove, op_kwargs={
                         'conn_id': a_id}, task_id='cleanup')
 
+    reg = register(object_url=uid)
+
     setup_task >> files >> uid >> en
 
 
diff --git a/docs/apirequests.adoc b/docs/apirequests.adoc
index 159bd41..d3276c7 100644
--- a/docs/apirequests.adoc
+++ b/docs/apirequests.adoc
@@ -111,6 +111,8 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \
    $DLS/dags/upload_example/dagRuns
 ----
 
+Optionally, the record created in b2share can be registered with data cat. This is achieved by providing 'register' parameter set to 'true'. This operation requires connection to data cat with credentials and named 'datacat'.
+
 
 === Image transfer example ===
 To transfer images from eFlows4HPC image build service use dag defined in +dags/image_transfer.py+ (transfer_image). It requires two parameters +image_id+ name of the image in the image
-- 
GitLab