Skip to content
Snippets Groups Projects
Commit 25a11f3d authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

example of upload flow with md tempalte taken from data catalog

parent 54d57fc7
No related branches found
No related tags found
No related merge requests found
......@@ -2,15 +2,16 @@
import os
import tempfile
from airflow import settings
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.dates import days_ago
from b2shareoperator import (add_file, create_draft_record,
get_record_template, submit_draft)
default_args = {
'owner': 'airflow',
}
......@@ -18,12 +19,40 @@ default_args = {
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def upload_example():
@task
def setup(**kwargs):
print(f"Setting up the connection")
params = kwargs['params']
rrid = kwargs['run_id']
host = params.get('host')
port = params.get('port', 2222)
user = params.get('login', 'eflows')
key = params.get('key')
conn_id = f"tmp_connection_{rrid}"
extra = {"private_key": key}
conn = Connection(
conn_id=conn_id,
conn_type='ssh',
description='Automatically generated Connection',
host=host,
login=user,
port=port,
extra=extra
)
session = settings.Session()
session.add(conn)
session.commit()
print(f"Connection {conn_id} created")
return conn_id
@task()
def load(**kwargs):
def load(connection_id, **kwargs):
params = kwargs['params']
target = params.get('target', '/tmp/')
source = params.get('source', '/tmp/')
connection_id = params.get('connection', 'default_ssh')
ssh_hook = SSHHook(ssh_conn_id=connection_id)
with ssh_hook.get_conn() as ssh_client:
......@@ -51,9 +80,29 @@ def upload_example():
# hate such hacks:
server = "https://" + connection.host
token = connection.extra_dejson['access_token']
print(f"Server: {server} + {token}")
template = get_record_template()
params = kwargs['params']
mid = params['mid']
hook = HttpHook(http_conn_id='datacat', method='GET')
hrespo = hook.run(endpoint=f"storage_target/{mid}").json()['metadata']
print(hrespo)
template = {
"titles" : [{"title":hrespo['title']}],
"creators" : [{"creator_name": hrespo['creator_name']}],
"descriptions" :[
{
"description": hrespo['description'],
"description_type": "Abstract"
}
],
"community": "a9217684-945b-4436-8632-cac271f894ed",
'community_specific':
{'91ae5d2a-3848-4693-9f7d-cbd141172ef0': {'helmholtz centre': ['Forschungszentrum Jülich']}},
"open_access": hrespo['open_access']=="True"
}
r = create_draft_record(server=server, token=token, record=template)
print(r)
print(f"Draft record created {r['id']} --> {r['links']['self']}")
......@@ -65,9 +114,22 @@ def upload_example():
print("Submitting record for pubication")
submitted = submit_draft(record=r, token=token)
print(f"Record created {submitted['id']}")
return submitted['id']
@task()
def remove(conn_id, uid):
print(f"Upload {uid} completed. Removing conneciton {conn_id}")
session = settings.Session()
for con in session.query(Connection).all():
print(con)
session.query(Connection).filter(Connection.conn_id == conn_id).delete()
session.commit()
files = load()
upload(files)
conn_id = setup()
files = load(connection_id=conn_id)
uid = upload(files)
remove(conn_id=conn_id, uid=uid)
dag = upload_example()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment