diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index 628fd9bf5f7de2afbbce45bf59211f22f1ba1708..f9a4cb3fd36031a3d6313fb875c8373756c6719f 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -21,7 +21,7 @@ def get_object_md(server, oid): def download_file(url: str, target_dir: str): - fname = tempfile.mkstemp(dir=target_dir) + fname = tempfile.mktemp(dir=target_dir) urllib.request.urlretrieve(url=url, filename=fname) return fname diff --git a/dags/taskflow.py b/dags/taskflow.py index c346559d1087fc9a5765cdcd14cf4604d2f12185..15c4efc5375e475ca7bb83d4ffdfc8aec06269eb 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -1,27 +1,37 @@ -"Example of new taskflow api" - from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.models.connection import Connection +from airflow.models.dagrun import DagRun import requests import urllib.request import tempfile -from b2shareoperator import get_file_list, download_file, get_object_md +from b2shareoperator import get_file_list, download_file, get_object_md, get_objects default_args = { 'owner': 'airflow', } @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) -def taskflow_example(): - @task() - def extract(oid: str): +def taskflow_example(**kwargs): + @task(multiple_outputs=True) + def extract(**kwargs): connection = Connection.get_connection_from_secrets('default_b2share') server = connection.get_uri() print(f"Rereiving data from {server}") + + params = kwargs['params'] + if 'oid' not in params: + print(f"Missing object id in pipeline parameters") + lst = get_objects(server=server) + flist = {o['id']: [f['key'] for f in o['files']] for o in lst} + print(f"Objects on server: {flist}") + return {} + else: + oid = params['oid'] + obj = get_object_md(server=server, oid=oid) - print(f"Object: {obj}") + print(f"Retrieved object {oid}: {obj}") flist = get_file_list(obj) return flist @@ -38,8 +48,9 @@ def taskflow_example(): def load(files: dict): print(f"Total files downloaded: {len(files)}") - data = extract(oid = 'b38609df2b334ea296ea1857e568dbea') + + data = extract() files = transform(data) load(files) - + dag = taskflow_example()