From dcec1a4dafaaa06f67703f4388d0a783ebe6b1bf Mon Sep 17 00:00:00 2001
From: jrybicki-jsc <j.rybicki@fz-juelich.de>
Date: Fri, 27 Aug 2021 10:53:56 +0200
Subject: [PATCH] etl scaffolding

---
 README.md               | 17 ++++++++++++++++-
 dags/b2shareoperator.py | 33 +++++++++++++++++++++++++--------
 dags/firsto.py          |  2 +-
 dags/taskflow.py        | 31 ++++++++++++++++++++++++-------
 4 files changed, 66 insertions(+), 17 deletions(-)

diff --git a/README.md b/README.md
index 2d4b427..c4438ce 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,18 @@
 # Data Logistics Service
 
-eFlows4HPC Data Logistics Service
\ No newline at end of file
+eFlows4HPC Data Logistics Service
+
+
+```
+mkdir ./logs ./plugins
+echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
+docker-compose -f dockers/docker-compose.yaml --project-directory . up airflow-init
+```
+
+```
+docker-compose -f dockers/docker-compose.yaml --project-directory . up -d
+```
+
+## Setup connection
+curl -X POST -u creds -H "Content-Type: application/json"  --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' localhost:7001/api/v1/connections
+
diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py
index 966c226..4ef4a9c 100644
--- a/dags/b2shareoperator.py
+++ b/dags/b2shareoperator.py
@@ -2,11 +2,28 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.models.connection import Connection
 import requests
 from urllib.parse import urljoin
+import tempfile
 
 def get_objects(server):
     lst = requests.get(urljoin(server, 'api/records')).json()
     return lst['hits']['hits']
 
+def get_file_list(obj):
+    file_url = obj['links']['files']
+    fls = requests.get(file_url).json()
+
+    return {it['key']: it['links']['self'] for it in fls['contents']}
+
+def get_object_md(server, oid):
+    obj= requests.get(urljoin(server, f"api/records/{oid}")).json()
+    return obj
+
+def download_file(url: str, target_dir: str):
+    _, fname = tempfile.mkstemp(dir=target_dir)
+    urllib.request.urlretrieve(url=url, filename=fname)
+    return fname
+
+
 server='https://b2share-testing.fz-juelich.de/'
 
 class B2ShareOperator(BaseOperator):
@@ -18,16 +35,16 @@ class B2ShareOperator(BaseOperator):
             **kwargs) -> None:
         super().__init__(**kwargs)
         self.name = name
-        self.connection = Connection.get_connection_from_secrets(conn_id)
+        self.conn_id = conn_id
+        print(kwargs)
 
     def execute(self, context):
-        message = "Hello {}".format(self.name)
-        print(message)
 
-        print(self.connection.get_uri())
+        connection = Connection.get_connection_from_secrets(self.conn_id)
+        print(f"Rereiving data from {connection.get_uri()}")
 
-        #print(f"Retrieving info from {self.connection.host}")
-        lst = get_objects(server=server)
-        print(f"GOT: {lst}")
+        lst = get_objects(server=connection.get_uri())
+        flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
+        print(f"GOT: {flist}")
         print(self.params)
-        return message
+        return len(flist)
diff --git a/dags/firsto.py b/dags/firsto.py
index 5ec0a48..eb2a6b1 100644
--- a/dags/firsto.py
+++ b/dags/firsto.py
@@ -24,5 +24,5 @@ with DAG('firsto', default_args=def_args, description='first dag', schedule_inte
 
     t3 = B2ShareOperator(task_id='task_b2sh', dag=dag, name='B2Share')
 
-    t1 >> t2
+    t1 >> t2 >> t3
 
diff --git a/dags/taskflow.py b/dags/taskflow.py
index 16a6b4b..8e80f63 100644
--- a/dags/taskflow.py
+++ b/dags/taskflow.py
@@ -3,25 +3,42 @@
 
 from airflow.decorators import dag, task
 from airflow.utils.dates import days_ago
+from airflow.models.connection import Connection
+import requests
+import urllib.request
+import tempfile
+from b2shareoperator import get_file_list, download_file, get_object_md
 
 default_args = {
     'owner': 'airflow',
 }
+
 @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
 def taskflow_example():
     @task()
-    def extract():
-        return {'key': 'value', 'key2': 'value2'}
+    def extract(oid: str):
+        connection = Connection.get_connection_from_secrets('default_b2share')
+        server = connection.get_uri()
+        print(f"Rereiving data from {server}")
+        obj = get_object_md(server=server, oid=oid)
+        print(f"Object: {obj}")
+        flist = get_file_list(obj)
+        return flist
 
     @task(multiple_outputs=True)
-    def transform(inps: dict):
-        return {"keys": len(inps)}
+    def transform(flist: dict):
+        name_mappings = {}
+        for fname, url in flist.items():
+            print(f"Processing: {fname} --> {url}")
+            tmpname = download_file(url=url, target_dir='/tmp/downs/')
+            name_mappings[fname]=tmpname
+        return name_mappings
 
     @task()
-    def load(lengths: float):
-        print(f"Total length value is: {lengths:.2f}")
+    def load(files: dict):
+        print(f"Total files downloaded: {len(files)}")
 
-    data = extract()
+    data = extract(oid = 'b38609df2b334ea296ea1857e568dbea')
     summary = transform(data)
     load(summary["keys"])
 
-- 
GitLab