diff --git a/dags/decors.py b/dags/decors.py index 2f9fd133aef585975fc871f4f9c70fb8d712f297..034e6a30be3367fb56624d67542c3e3237c6848a 100644 --- a/dags/decors.py +++ b/dags/decors.py @@ -39,7 +39,7 @@ def get_connection(conn_id, **kwargs): # for now SSH is hardcoded params = kwargs['params'] host = params.get('host') - port = int(params.get('port', 2222)) + port = int(params.get('port', 22)) user = params.get('login', 'eflows') hook = SSHHook(remote_host=host, port=port, username=user) # key in vault should be in form of formated string: diff --git a/dags/image_transfer.py b/dags/image_transfer.py new file mode 100644 index 0000000000000000000000000000000000000000..7fbce7b40e123373a2e930a098cbe9df40f7f214 --- /dev/null +++ b/dags/image_transfer.py @@ -0,0 +1,45 @@ +import os +import shutil +import requests + +from airflow.decorators import dag, task +from airflow.utils.dates import days_ago +from airflow.operators.python import PythonOperator + +from decors import setup, get_connection, remove + +default_args = { + 'owner': 'airflow', +} + + +@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) +def transfer_image(): + + @task + def stream_upload(connection_id, **kwargs): + params = kwargs['params'] + target = params.get('target', '/tmp/') + image_id = params.get('image_id', 'wordcount_skylake.sif') + url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}" + + print(f"Putting {url} --> {target} connection") + ssh_hook = get_connection(conn_id=connection_id, **kwargs) + + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + ssh_client.exec_command(command=f"mkdir -p {target}") + with requests.get(url, stream=True, verify=False) as r: + with sftp_client.open(os.path.join(target, image_id), 'wb') as f: + shutil.copyfileobj(r.raw, f) + + setup_task = PythonOperator( + python_callable=setup, task_id='setup_connection') + a_id = setup_task.output['return_value'] + cleanup_task = PythonOperator(python_callable=remove, op_kwargs={ + 'conn_id': a_id}, task_id='cleanup') + + setup_task >> stream_upload(connection_id=a_id) >> cleanup_task + + +dag = transfer_image() diff --git a/dags/uploadflow.py b/dags/uploadflow.py index e38a40f045ede3ab62ac45ef1b5a564b49e4fa0b..ff70aff2ab6e509fca0baca75d9c8ef55d26ec17 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -105,8 +105,7 @@ def upload_example(): - setup_task = PythonOperator( - python_callable=setup, task_id='setup_connection') + setup_task = PythonOperator(python_callable=setup, task_id='setup_connection') a_id = setup_task.output['return_value'] files = load(connection_id=a_id)