Skip to content
Snippets Groups Projects
Commit 68360d60 authored by Christian Boettcher's avatar Christian Boettcher
Browse files

Merge branch 'main' of gitlab.jsc.fz-juelich.de:eflows4hpc-wp2/data-logistics-service into main

parents f433c5a8 37dc5ecb
No related branches found
No related tags found
No related merge requests found
Pipeline #96509 passed
...@@ -39,7 +39,7 @@ def get_connection(conn_id, **kwargs): ...@@ -39,7 +39,7 @@ def get_connection(conn_id, **kwargs):
# for now SSH is hardcoded # for now SSH is hardcoded
params = kwargs['params'] params = kwargs['params']
host = params.get('host') host = params.get('host')
port = int(params.get('port', 2222)) port = int(params.get('port', 22))
user = params.get('login', 'eflows') user = params.get('login', 'eflows')
hook = SSHHook(remote_host=host, port=port, username=user) hook = SSHHook(remote_host=host, port=port, username=user)
# key in vault should be in form of formated string: # key in vault should be in form of formated string:
......
import os
import shutil
import requests
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from decors import setup, get_connection, remove
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def transfer_image():
@task
def stream_upload(connection_id, **kwargs):
params = kwargs['params']
target = params.get('target', '/tmp/')
image_id = params.get('image_id', 'wordcount_skylake.sif')
url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}"
print(f"Putting {url} --> {target} connection")
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
ssh_client.exec_command(command=f"mkdir -p {target}")
with requests.get(url, stream=True, verify=False) as r:
with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
shutil.copyfileobj(r.raw, f)
setup_task = PythonOperator(
python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value']
cleanup_task = PythonOperator(python_callable=remove, op_kwargs={
'conn_id': a_id}, task_id='cleanup')
setup_task >> stream_upload(connection_id=a_id) >> cleanup_task
dag = transfer_image()
...@@ -105,8 +105,7 @@ def upload_example(): ...@@ -105,8 +105,7 @@ def upload_example():
setup_task = PythonOperator( setup_task = PythonOperator(python_callable=setup, task_id='setup_connection')
python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value'] a_id = setup_task.output['return_value']
files = load(connection_id=a_id) files = load(connection_id=a_id)
......
...@@ -112,6 +112,18 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \ ...@@ -112,6 +112,18 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \
---- ----
=== Image transfer example ===
To transfer images from eFlows4HPC image build service use dag defined in +dags/image_transfer.py+ (transfer_image). It requires two parameters +image_id+ name of the image in the image
build service (e.g. "wordcount_skylake.sif") and +target+ which defines a path on the system where the image will be transfered to.
The parameters should be passed along the credentials as described in <<credentials>>. The target directory will be created with ``mkdir -p`` on the target machine. The image is streamed directly to the target location (no local copy on DLS worker).
----
curl -X POST -u USER:PASS -H "Content-Type: application/json" \
--data '{"conf": {"image_id": imageID, "target": PATH}}' \
$DLS/dags/transfer_image/dagRuns
----
=== Comments === === Comments ===
I could image that a name of DLS pipeline (+taskflow_example+) can change and needs to be passed as parameter to YORC. I could image that a name of DLS pipeline (+taskflow_example+) can change and needs to be passed as parameter to YORC.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment