From 6861c0e8d69f74c57f79612458c31ce7cf074af4 Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Fri, 8 Apr 2022 17:55:18 +0200 Subject: [PATCH] pipeline to save the time --- dags/image_transfer.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/dags/image_transfer.py b/dags/image_transfer.py index 7fbce7b..caf6d42 100644 --- a/dags/image_transfer.py +++ b/dags/image_transfer.py @@ -5,6 +5,7 @@ import requests from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator +import paramiko from decors import setup, get_connection, remove @@ -12,6 +13,14 @@ default_args = { 'owner': 'airflow', } +class FastTransport(paramiko.Transport): + + def __init__(self, sock): + super(FastTransport, self).__init__(sock) + self.window_size = 2147483647 + self.packetizer.REKEY_BYTES = pow(2, 40) + self.packetizer.REKEY_PACKETS = pow(2, 40) + @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def transfer_image(): @@ -27,12 +36,17 @@ def transfer_image(): ssh_hook = get_connection(conn_id=connection_id, **kwargs) with ssh_hook.get_conn() as ssh_client: + ssh_client.s sftp_client = ssh_client.open_sftp() ssh_client.exec_command(command=f"mkdir -p {target}") + with requests.get(url, stream=True, verify=False) as r: with sftp_client.open(os.path.join(target, image_id), 'wb') as f: - shutil.copyfileobj(r.raw, f) - + f.set_pipelined(pipelined=True) + while chunk := r.raw.read(1024 * 10000): + content_to_write = memoryview(chunk) + f.write(content_to_write) + setup_task = PythonOperator( python_callable=setup, task_id='setup_connection') a_id = setup_task.output['return_value'] -- GitLab