Skip to content
Snippets Groups Projects
Select Git revision
  • 4fb72732bc64e9f576a3e435f67ad1c76b77e885
  • main default protected
  • airflow-2.7.0 protected
  • airflow253 protected
  • air251
  • test_docker_op
  • airflow225
  • mptest
  • https-deployment
  • datacat_integration protected
  • datacatalog-integration
  • stable-2.2.2 protected
  • stable-2.2.1 protected
  • stable-2.2.0 protected
  • stable-2.1.4 protected
  • stable-2.1.3 protected
  • stable-2.1.2 protected
  • stable-2.1.1 protected
  • stable-2.1.0 protected
  • stable-2.0.2 protected
  • stable-2.0.1 protected
  • stable-2.0.0 protected
  • stable-1.0.1 protected
  • stable-1.0 protected
  • stable-0.1 protected
25 results

image_transfer.py

Blame
  • image_transfer.py 2.09 KiB
    import os
    import requests
    
    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from dags.uploadflow import copy_streams
    
    from decors import setup, get_connection, remove
    
    default_args = {
        'owner': 'airflow',
    }
    
    def file_exist(sftp, name):
        try:
            r = sftp.stat(name)  
            return r.st_size
        except:
            return -1
    
    @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
    def transfer_image():
    
        
        @task
        def stream_upload(connection_id, **kwargs):
            params = kwargs['params']
            target = params.get('target', '/tmp/')
            image_id = params.get('image_id', 'wordcount_skylake.sif')
            url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}"
    
            print(f"Putting {url} --> {target} connection")
            ssh_hook = get_connection(conn_id=connection_id, **kwargs)
    
            with ssh_hook.get_conn() as ssh_client:
                sftp_client = ssh_client.open_sftp()
                remote_name = os.path.join(target, image_id)
                size = file_exist(sftp=sftp_client, name=remote_name)
                if size>0:
                    print(f"File {remote_name} exists and has {size} bytes")
                    force = params.get('force', True)
                    if force!= True:
                        return 0
                    print("Forcing overwrite")
    
                ssh_client.exec_command(command=f"mkdir -p {target}")
                
                with requests.get(url, stream=True, verify=False) as r:
                    with sftp_client.open(remote_name, 'wb') as f:
                        f.set_pipelined(pipelined=True)
                        copy_streams(input=r, output=f)
                        
        setup_task = PythonOperator(
            python_callable=setup, task_id='setup_connection')
        a_id = setup_task.output['return_value']
        cleanup_task = PythonOperator(python_callable=remove, op_kwargs={
                                      'conn_id': a_id}, task_id='cleanup')
    
        setup_task >> stream_upload(connection_id=a_id) >> cleanup_task
    
    
    dag = transfer_image()