diff --git a/README.md b/README.md index 2d4b42788dcdd9be79e305ddd6405ddfa0b13dc6..c4438ce043f62e5f27709db5552036f1d72883c2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,18 @@ # Data Logistics Service -eFlows4HPC Data Logistics Service \ No newline at end of file +eFlows4HPC Data Logistics Service + + +``` +mkdir ./logs ./plugins +echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env +docker-compose -f dockers/docker-compose.yaml --project-directory . up airflow-init +``` + +``` +docker-compose -f dockers/docker-compose.yaml --project-directory . up -d +``` + +## Setup connection +curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' localhost:7001/api/v1/connections + diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index 966c226a380a66129b1ff0c430d59dc0ec8e9c55..4ef4a9c8ba34c4c816a9c6afad0bca9a1a8cf1c0 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -2,11 +2,28 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection import requests from urllib.parse import urljoin +import tempfile def get_objects(server): lst = requests.get(urljoin(server, 'api/records')).json() return lst['hits']['hits'] +def get_file_list(obj): + file_url = obj['links']['files'] + fls = requests.get(file_url).json() + + return {it['key']: it['links']['self'] for it in fls['contents']} + +def get_object_md(server, oid): + obj= requests.get(urljoin(server, f"api/records/{oid}")).json() + return obj + +def download_file(url: str, target_dir: str): + _, fname = tempfile.mkstemp(dir=target_dir) + urllib.request.urlretrieve(url=url, filename=fname) + return fname + + server='https://b2share-testing.fz-juelich.de/' class B2ShareOperator(BaseOperator): @@ -18,16 +35,16 @@ class B2ShareOperator(BaseOperator): **kwargs) -> None: super().__init__(**kwargs) self.name = name - self.connection = Connection.get_connection_from_secrets(conn_id) + self.conn_id = conn_id + print(kwargs) def execute(self, context): - message = "Hello {}".format(self.name) - print(message) - print(self.connection.get_uri()) + connection = Connection.get_connection_from_secrets(self.conn_id) + print(f"Rereiving data from {connection.get_uri()}") - #print(f"Retrieving info from {self.connection.host}") - lst = get_objects(server=server) - print(f"GOT: {lst}") + lst = get_objects(server=connection.get_uri()) + flist = {o['id']: [f['key'] for f in o['files']] for o in lst} + print(f"GOT: {flist}") print(self.params) - return message + return len(flist) diff --git a/dags/firsto.py b/dags/firsto.py index 5ec0a48e0b082fb1ed8823aa7a27afc7ba7fc41c..eb2a6b1e3c7ee8d855ab77c83c0bdd21c95d8c26 100644 --- a/dags/firsto.py +++ b/dags/firsto.py @@ -24,5 +24,5 @@ with DAG('firsto', default_args=def_args, description='first dag', schedule_inte t3 = B2ShareOperator(task_id='task_b2sh', dag=dag, name='B2Share') - t1 >> t2 + t1 >> t2 >> t3 diff --git a/dags/taskflow.py b/dags/taskflow.py index 16a6b4b43351692718515c30be604ced6fefec5b..8e80f636e6eb1b57d5adb97da2eec18f63f66bcd 100644 --- a/dags/taskflow.py +++ b/dags/taskflow.py @@ -3,25 +3,42 @@ from airflow.decorators import dag, task from airflow.utils.dates import days_ago +from airflow.models.connection import Connection +import requests +import urllib.request +import tempfile +from b2shareoperator import get_file_list, download_file, get_object_md default_args = { 'owner': 'airflow', } + @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def taskflow_example(): @task() - def extract(): - return {'key': 'value', 'key2': 'value2'} + def extract(oid: str): + connection = Connection.get_connection_from_secrets('default_b2share') + server = connection.get_uri() + print(f"Rereiving data from {server}") + obj = get_object_md(server=server, oid=oid) + print(f"Object: {obj}") + flist = get_file_list(obj) + return flist @task(multiple_outputs=True) - def transform(inps: dict): - return {"keys": len(inps)} + def transform(flist: dict): + name_mappings = {} + for fname, url in flist.items(): + print(f"Processing: {fname} --> {url}") + tmpname = download_file(url=url, target_dir='/tmp/downs/') + name_mappings[fname]=tmpname + return name_mappings @task() - def load(lengths: float): - print(f"Total length value is: {lengths:.2f}") + def load(files: dict): + print(f"Total files downloaded: {len(files)}") - data = extract() + data = extract(oid = 'b38609df2b334ea296ea1857e568dbea') summary = transform(data) load(summary["keys"])