Skip to content
Snippets Groups Projects
Commit 6861c0e8 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

pipeline to save the time

parent 7bed9058
No related branches found
No related tags found
No related merge requests found
Pipeline #97397 failed
......@@ -5,6 +5,7 @@ import requests
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
import paramiko
from decors import setup, get_connection, remove
......@@ -12,6 +13,14 @@ default_args = {
'owner': 'airflow',
}
class FastTransport(paramiko.Transport):
def __init__(self, sock):
super(FastTransport, self).__init__(sock)
self.window_size = 2147483647
self.packetizer.REKEY_BYTES = pow(2, 40)
self.packetizer.REKEY_PACKETS = pow(2, 40)
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def transfer_image():
......@@ -27,11 +36,16 @@ def transfer_image():
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
ssh_client.s
sftp_client = ssh_client.open_sftp()
ssh_client.exec_command(command=f"mkdir -p {target}")
with requests.get(url, stream=True, verify=False) as r:
with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
shutil.copyfileobj(r.raw, f)
f.set_pipelined(pipelined=True)
while chunk := r.raw.read(1024 * 10000):
content_to_write = memoryview(chunk)
f.write(content_to_write)
setup_task = PythonOperator(
python_callable=setup, task_id='setup_connection')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment