Skip to content
Snippets Groups Projects
Commit dcec1a4d authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

etl scaffolding

parent e5eade13
No related branches found
No related tags found
No related merge requests found
Pipeline #76947 passed
# Data Logistics Service
eFlows4HPC Data Logistics Service
```
mkdir ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
docker-compose -f dockers/docker-compose.yaml --project-directory . up airflow-init
```
```
docker-compose -f dockers/docker-compose.yaml --project-directory . up -d
```
## Setup connection
curl -X POST -u creds -H "Content-Type: application/json" --data '{"connection_id": "default_b2share","conn_type":"https", "host": "b2share-testing.fz-juelich.de", "schema":""}' localhost:7001/api/v1/connections
......@@ -2,11 +2,28 @@ from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
import requests
from urllib.parse import urljoin
import tempfile
def get_objects(server):
lst = requests.get(urljoin(server, 'api/records')).json()
return lst['hits']['hits']
def get_file_list(obj):
file_url = obj['links']['files']
fls = requests.get(file_url).json()
return {it['key']: it['links']['self'] for it in fls['contents']}
def get_object_md(server, oid):
obj= requests.get(urljoin(server, f"api/records/{oid}")).json()
return obj
def download_file(url: str, target_dir: str):
_, fname = tempfile.mkstemp(dir=target_dir)
urllib.request.urlretrieve(url=url, filename=fname)
return fname
server='https://b2share-testing.fz-juelich.de/'
class B2ShareOperator(BaseOperator):
......@@ -18,16 +35,16 @@ class B2ShareOperator(BaseOperator):
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.connection = Connection.get_connection_from_secrets(conn_id)
self.conn_id = conn_id
print(kwargs)
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
print(self.connection.get_uri())
connection = Connection.get_connection_from_secrets(self.conn_id)
print(f"Rereiving data from {connection.get_uri()}")
#print(f"Retrieving info from {self.connection.host}")
lst = get_objects(server=server)
print(f"GOT: {lst}")
lst = get_objects(server=connection.get_uri())
flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
print(f"GOT: {flist}")
print(self.params)
return message
return len(flist)
......@@ -24,5 +24,5 @@ with DAG('firsto', default_args=def_args, description='first dag', schedule_inte
t3 = B2ShareOperator(task_id='task_b2sh', dag=dag, name='B2Share')
t1 >> t2
t1 >> t2 >> t3
......@@ -3,25 +3,42 @@
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.models.connection import Connection
import requests
import urllib.request
import tempfile
from b2shareoperator import get_file_list, download_file, get_object_md
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_example():
@task()
def extract():
return {'key': 'value', 'key2': 'value2'}
def extract(oid: str):
connection = Connection.get_connection_from_secrets('default_b2share')
server = connection.get_uri()
print(f"Rereiving data from {server}")
obj = get_object_md(server=server, oid=oid)
print(f"Object: {obj}")
flist = get_file_list(obj)
return flist
@task(multiple_outputs=True)
def transform(inps: dict):
return {"keys": len(inps)}
def transform(flist: dict):
name_mappings = {}
for fname, url in flist.items():
print(f"Processing: {fname} --> {url}")
tmpname = download_file(url=url, target_dir='/tmp/downs/')
name_mappings[fname]=tmpname
return name_mappings
@task()
def load(lengths: float):
print(f"Total length value is: {lengths:.2f}")
def load(files: dict):
print(f"Total files downloaded: {len(files)}")
data = extract()
data = extract(oid = 'b38609df2b334ea296ea1857e568dbea')
summary = transform(data)
load(summary["keys"])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment