diff --git a/dags/decors.py b/dags/decors.py
index 2f9fd133aef585975fc871f4f9c70fb8d712f297..034e6a30be3367fb56624d67542c3e3237c6848a 100644
--- a/dags/decors.py
+++ b/dags/decors.py
@@ -39,7 +39,7 @@ def get_connection(conn_id, **kwargs):
         # for now SSH is hardcoded
         params = kwargs['params']
         host = params.get('host')
-        port = int(params.get('port', 2222))
+        port = int(params.get('port', 22))
         user = params.get('login', 'eflows')
         hook = SSHHook(remote_host=host, port=port, username=user)
         # key in vault should be in form of formated string:
diff --git a/dags/image_transfer.py b/dags/image_transfer.py
new file mode 100644
index 0000000000000000000000000000000000000000..7fbce7b40e123373a2e930a098cbe9df40f7f214
--- /dev/null
+++ b/dags/image_transfer.py
@@ -0,0 +1,45 @@
+import os
+import shutil
+import requests
+
+from airflow.decorators import dag, task
+from airflow.utils.dates import days_ago
+from airflow.operators.python import PythonOperator
+
+from decors import setup, get_connection, remove
+
+default_args = {
+    'owner': 'airflow',
+}
+
+
+@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
+def transfer_image():
+
+    @task
+    def stream_upload(connection_id, **kwargs):
+        params = kwargs['params']
+        target = params.get('target', '/tmp/')
+        image_id = params.get('image_id', 'wordcount_skylake.sif')
+        url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}"
+
+        print(f"Putting {url} --> {target} connection")
+        ssh_hook = get_connection(conn_id=connection_id, **kwargs)
+
+        with ssh_hook.get_conn() as ssh_client:
+            sftp_client = ssh_client.open_sftp()
+            ssh_client.exec_command(command=f"mkdir -p {target}")
+            with requests.get(url, stream=True, verify=False) as r:
+                with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
+                    shutil.copyfileobj(r.raw, f)
+
+    setup_task = PythonOperator(
+        python_callable=setup, task_id='setup_connection')
+    a_id = setup_task.output['return_value']
+    cleanup_task = PythonOperator(python_callable=remove, op_kwargs={
+                                  'conn_id': a_id}, task_id='cleanup')
+
+    setup_task >> stream_upload(connection_id=a_id) >> cleanup_task
+
+
+dag = transfer_image()
diff --git a/dags/uploadflow.py b/dags/uploadflow.py
index e38a40f045ede3ab62ac45ef1b5a564b49e4fa0b..ff70aff2ab6e509fca0baca75d9c8ef55d26ec17 100644
--- a/dags/uploadflow.py
+++ b/dags/uploadflow.py
@@ -105,8 +105,7 @@ def upload_example():
 
 
 
-    setup_task = PythonOperator(
-        python_callable=setup, task_id='setup_connection')
+    setup_task = PythonOperator(python_callable=setup, task_id='setup_connection')
     a_id = setup_task.output['return_value']
 
     files = load(connection_id=a_id)
diff --git a/docs/apirequests.adoc b/docs/apirequests.adoc
index 741c9a97f854fe7617d9fe7fe1c6f40e05d01fc6..159bd412f29aee8d2fde4b619da569abb068c5bc 100644
--- a/docs/apirequests.adoc
+++ b/docs/apirequests.adoc
@@ -112,6 +112,18 @@ curl -X POST -u USER:PASS -H "Content-Type: application/json" \
 ----
 
 
+=== Image transfer example ===
+To transfer images from eFlows4HPC image build service use dag defined in +dags/image_transfer.py+ (transfer_image). It requires two parameters +image_id+ name of the image in the image
+build service (e.g. "wordcount_skylake.sif") and +target+ which defines a path on the system where the image will be transfered to. 
+
+The parameters should be passed along the credentials as described in <<credentials>>. The target directory will be created with ``mkdir -p`` on the target machine. The image is streamed directly to the target location (no local copy on DLS worker).
+
+----
+curl -X POST -u USER:PASS -H "Content-Type: application/json" \
+   --data '{"conf": {"image_id": imageID, "target": PATH}}' \
+   $DLS/dags/transfer_image/dagRuns
+----
+
 
 === Comments ===
 I could image that a name of DLS pipeline (+taskflow_example+) can change and needs to be passed as parameter to YORC.