Skip to content
Snippets Groups Projects
Commit bd6dfca2 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

stage-in compatbile with datacat ids

parent d6889f39
Branches
Tags
No related merge requests found
Pipeline #99736 failed
......@@ -5,10 +5,12 @@ from airflow.models.connection import Connection
from airflow.models import Variable
from airflow.utils.dates import days_ago
import os
from datacat_integration.hooks import DataCatalogHook
import json
from decors import get_connection, remove, setup
from b2shareoperator import (download_file, get_file_list, get_object_md,
get_objects)
from b2shareoperator import (download_file, get_file_list, get_object_md)
default_args = {
'owner': 'airflow',
......@@ -20,22 +22,34 @@ def taskflow_example():
@task(multiple_outputs=True)
def extract(conn_id, **kwargs):
params = kwargs['params']
if 'oid' not in params:
print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id")
return -1
oid = params['oid']
hook = DataCatalogHook()
try:
entry = json.loads(hook.get_entry('dataset', oid))
if entry and 'b2share' in entry['url']:
print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}")
oid = entry['url'].split('/')[-1]
print(f"Extracted oid {oid}")
else:
print('No entry in data cat or not a b2share entry')
except:
# falling back to b2share
print("No entry found. Probably a b2share object")
connection = Connection.get_connection_from_secrets('default_b2share')
server = connection.get_uri()
print(f"Rereiving data from {server}")
params = kwargs['params']
if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"}
print("Missing object id in pipeline parameters")
lst = get_objects(server=server)
flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
print(f"Objects on server: {flist}")
return -1 # non zero exit code is a task failure
oid = params['oid']
obj = get_object_md(server=server, oid=oid)
print(f"Retrieved object {oid}: {obj}")
# check status?
flist = get_file_list(obj)
return flist
......
from importlib.metadata import metadata
import os
import shutil
import requests
......@@ -6,10 +7,12 @@ from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.dates import days_ago
from datacat_integration.hooks import DataCatalogHook
import json
from b2shareoperator import (get_file_list, get_object_md,
get_objects)
from b2shareoperator import (get_file_list, get_object_md)
default_args = {
'owner': 'airflow',
......@@ -20,19 +23,31 @@ default_args = {
def taskflow_stream():
@task(multiple_outputs=True)
def get_flist(**kwargs):
params = kwargs['params']
if 'oid' not in params:
print("Missing object id in pipeline parameters. Please provide an id for b2share or data cat id")
return -1
oid = params['oid']
hook = DataCatalogHook()
try:
entry = json.loads(hook.get_entry('dataset', oid))
if entry and 'b2share' in entry['url']:
print(f"Got data cat b2share entry: {entry}\nwith url: {entry['url']}")
oid = entry['url'].split('/')[-1]
print(f"Extracted oid {oid}")
else:
print('No entry in data cat or not a b2share entry')
except:
# falling back to b2share
print("No entry found. Probably a b2share object")
connection = Connection.get_connection_from_secrets('default_b2share')
server = connection.get_uri()
print(f"Rereiving data from {server}")
params = kwargs['params']
if 'oid' not in params: # {"oid":"b38609df2b334ea296ea1857e568dbea"}
print("Missing object id in pipeline parameters")
lst = get_objects(server=server)
flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
print(f"Objects on server: {flist}")
return -1
oid = params['oid']
obj = get_object_md(server=server, oid=oid)
print(f"Retrieved object {oid}: {obj}")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment