From 36c12439a329c362f221da7b425051032ecc7aa2 Mon Sep 17 00:00:00 2001
From: jrybicki-jsc <j.rybicki@fz-juelich.de>
Date: Tue, 31 Aug 2021 10:10:23 +0200
Subject: [PATCH] passing oid as parameter check #4

---
 dags/b2shareoperator.py |  2 +-
 dags/taskflow.py        | 29 ++++++++++++++++++++---------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py
index 628fd9b..f9a4cb3 100644
--- a/dags/b2shareoperator.py
+++ b/dags/b2shareoperator.py
@@ -21,7 +21,7 @@ def get_object_md(server, oid):
 
 def download_file(url: str, target_dir: str):
     
-    fname = tempfile.mkstemp(dir=target_dir)
+    fname = tempfile.mktemp(dir=target_dir)
     urllib.request.urlretrieve(url=url, filename=fname)
     return fname
 
diff --git a/dags/taskflow.py b/dags/taskflow.py
index c346559..15c4efc 100644
--- a/dags/taskflow.py
+++ b/dags/taskflow.py
@@ -1,27 +1,37 @@
 
-"Example of new taskflow api"
-
 from airflow.decorators import dag, task
 from airflow.utils.dates import days_ago
 from airflow.models.connection import Connection
+from airflow.models.dagrun import DagRun
 import requests
 import urllib.request
 import tempfile
-from b2shareoperator import get_file_list, download_file, get_object_md
+from b2shareoperator import get_file_list, download_file, get_object_md, get_objects
 
 default_args = {
     'owner': 'airflow',
 }
 
 @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
-def taskflow_example():
-    @task()
-    def extract(oid: str):
+def taskflow_example(**kwargs):
+    @task(multiple_outputs=True)
+    def extract(**kwargs):
         connection = Connection.get_connection_from_secrets('default_b2share')
         server = connection.get_uri()
         print(f"Rereiving data from {server}")
+
+        params = kwargs['params']
+        if 'oid' not in params:
+            print(f"Missing object id in pipeline parameters")
+            lst = get_objects(server=server)
+            flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
+            print(f"Objects on server: {flist}")
+            return {} 
+        else:
+            oid = params['oid']
+
         obj = get_object_md(server=server, oid=oid)
-        print(f"Object: {obj}")
+        print(f"Retrieved object {oid}: {obj}")
         flist = get_file_list(obj)
         return flist
 
@@ -38,8 +48,9 @@ def taskflow_example():
     def load(files: dict):
         print(f"Total files downloaded: {len(files)}")
 
-    data = extract(oid = 'b38609df2b334ea296ea1857e568dbea')
+    
+    data = extract()
     files = transform(data)
     load(files)
-
+    
 dag = taskflow_example()
-- 
GitLab