Skip to content
Snippets Groups Projects
Commit 7bed9058 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

two step image transfer dag

parent 5fe67886
Branches
No related tags found
No related merge requests found
Pipeline #97301 passed
import os
import shutil
import requests
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from just_reg import get_parameter
from decors import setup, get_connection, remove
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def transfer_image_alt():
@task
def im_download(connection_id, **kwargs):
work_dir = Variable.get("working_dir", default_var='/tmp/')
image_id = get_parameter(
'image_id', default='wordcount_skylake.sif', **kwargs)
url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}"
print(f"Putting {url} --> {work_dir} connection")
with requests.get(url, stream=True, verify=False) as r:
with open(os.path.join(work_dir, image_id), 'wb') as f:
shutil.copyfileobj(r.raw, f)
@task
def im_upload(connection_id, **kwargs):
if not get_parameter('upload', False, **kwargs):
print('Skipping upload')
return 0
work_dir = Variable.get("working_dir", default_var='/tmp/')
target = get_parameter('target', default='/tmp/', **kwargs)
image_id = get_parameter(
'image_id', default='wordcount_skylake.sif', **kwargs)
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
print(
f"Copying local {os.path.join(work_dir, image_id)} -> {connection_id}:{target}")
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
ssh_client.exec_command(command=f"mkdir -p {target}")
with open(os.path.join(work_dir, image_id), 'rb') as r:
with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
shutil.copyfileobj(r.raw, f)
print('Removing local copy')
os.unlink(os.path.join(work_dir, image_id))
setup_task = PythonOperator(
python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value']
cleanup_task = PythonOperator(python_callable=remove, op_kwargs={
'conn_id': a_id}, task_id='cleanup')
setup_task >> im_download(connection_id=a_id) >> im_upload(
connection_id=a_id) >> cleanup_task
dag = transfer_image_alt()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment