Skip to content
Snippets Groups Projects
Commit b18868b7 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

image transfer dag

parent 29f0625c
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,7 @@ def get_connection(conn_id, **kwargs): ...@@ -39,7 +39,7 @@ def get_connection(conn_id, **kwargs):
# for now SSH is hardcoded # for now SSH is hardcoded
params = kwargs['params'] params = kwargs['params']
host = params.get('host') host = params.get('host')
port = int(params.get('port', 2222)) port = int(params.get('port', 22))
user = params.get('login', 'eflows') user = params.get('login', 'eflows')
hook = SSHHook(remote_host=host, port=port, username=user) hook = SSHHook(remote_host=host, port=port, username=user)
# key in vault should be in form of formated string: # key in vault should be in form of formated string:
......
import os
import shutil
import requests
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from decors import setup, get_connection, remove
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def transfer_image():
@task
def stream_upload(connection_id, **kwargs):
params = kwargs['params']
target = params.get('target', '/tmp/')
image_id = params.get('image_id', 'wordcount_skylake.sif')
url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}"
print(f"Putting {url} --> {target} connection")
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
ssh_client.exec_command(command=f"mkdir -p {target}")
with requests.get(url, stream=True, verify=False) as r:
with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
shutil.copyfileobj(r.raw, f)
setup_task = PythonOperator(
python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value']
cleanup_task = PythonOperator(python_callable=remove, op_kwargs={
'conn_id': a_id}, task_id='cleanup')
setup_task >> stream_upload(connection_id=a_id) >> cleanup_task
dag = transfer_image()
...@@ -105,8 +105,7 @@ def upload_example(): ...@@ -105,8 +105,7 @@ def upload_example():
setup_task = PythonOperator( setup_task = PythonOperator(python_callable=setup, task_id='setup_connection')
python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value'] a_id = setup_task.output['return_value']
files = load(connection_id=a_id) files = load(connection_id=a_id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment