diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 77520154d481b63a8d178a6b140dde18c03151f7..2df77965f41c2863cc6a01341242a97e03597aa9 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -103,7 +103,7 @@ full-deploy-production:
     - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo mkdir -p /persistent_data && sudo mount /dev/vdb1 /persistent_data"
     - until ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP ls /finished_cloudinit >/dev/null 2>&1; do sleep 30; done # wait until cloudinit script is complete
     - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo service docker restart" # to use the configured docker data path
-    - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS"
+    - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY"
     - echo "Done"
 
 # NOTE Light deployment did not perform well when the template/main.html file was changed (in case of the official airflow image being updated)
diff --git a/dags/image_transfer.py b/dags/image_transfer.py
index 7fbce7b40e123373a2e930a098cbe9df40f7f214..ba2da0642cbc4d2a517af7152e06943316ca7079 100644
--- a/dags/image_transfer.py
+++ b/dags/image_transfer.py
@@ -1,5 +1,4 @@
 import os
-import shutil
 import requests
 
 from airflow.decorators import dag, task
@@ -12,6 +11,12 @@ default_args = {
     'owner': 'airflow',
 }
 
+def file_exist(sftp, name):
+    try:
+        r = sftp.stat(name)  
+        return r.st_size
+    except:
+        return -1
 
 @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
 def transfer_image():
@@ -28,11 +33,27 @@ def transfer_image():
 
         with ssh_hook.get_conn() as ssh_client:
             sftp_client = ssh_client.open_sftp()
+            remote_name = os.path.join(target, image_id)
+            size = file_exist(sftp=sftp_client, name=remote_name)
+            if size>0:
+                print(f"File {remote_name} exists and has {size} bytes")
+                force = params.get('force', True)
+                if force!= True:
+                    return 0
+                print("Forcing overwrite")
+
             ssh_client.exec_command(command=f"mkdir -p {target}")
+            
             with requests.get(url, stream=True, verify=False) as r:
-                with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
-                    shutil.copyfileobj(r.raw, f)
-
+                with sftp_client.open(remote_name, 'wb') as f:
+                    f.set_pipelined(pipelined=True)
+                    while True:
+                        chunk=r.raw.read(1024 * 1000)
+                        if not chunk:
+                            break
+                        content_to_write = memoryview(chunk)
+                        f.write(content_to_write)
+                    
     setup_task = PythonOperator(
         python_callable=setup, task_id='setup_connection')
     a_id = setup_task.output['return_value']
diff --git a/dags/image_transfer_alt.py b/dags/image_transfer_alt.py
new file mode 100644
index 0000000000000000000000000000000000000000..f15ea5a48f6b0580d14e5d38b7a5441cfb10255a
--- /dev/null
+++ b/dags/image_transfer_alt.py
@@ -0,0 +1,67 @@
+import os
+import shutil
+import requests
+
+from airflow.decorators import dag, task
+from airflow.utils.dates import days_ago
+from airflow.operators.python import PythonOperator
+from airflow.models import Variable
+from just_reg import get_parameter
+from decors import setup, get_connection, remove
+
+default_args = {
+    'owner': 'airflow',
+}
+
+
+@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
+def transfer_image_alt():
+
+    @task
+    def im_download(connection_id, **kwargs):
+
+        work_dir = Variable.get("working_dir", default_var='/tmp/')
+
+        image_id = get_parameter(
+            'image_id', default='wordcount_skylake.sif', **kwargs)
+        url = f"https://bscgrid20.bsc.es/image_creation/images/download/{image_id}"
+
+        print(f"Putting {url} --> {work_dir} connection")
+        with requests.get(url, stream=True, verify=False) as r:
+            with open(os.path.join(work_dir, image_id), 'wb') as f:
+                shutil.copyfileobj(r.raw, f)
+
+    @task
+    def im_upload(connection_id, **kwargs):
+        if not get_parameter('upload', False, **kwargs):
+            print('Skipping upload')
+            return 0
+        work_dir = Variable.get("working_dir", default_var='/tmp/')
+        target = get_parameter('target', default='/tmp/', **kwargs)
+        image_id = get_parameter(
+            'image_id', default='wordcount_skylake.sif', **kwargs)
+        ssh_hook = get_connection(conn_id=connection_id, **kwargs)
+        print(
+            f"Copying local {os.path.join(work_dir, image_id)} -> {connection_id}:{target}")
+        with ssh_hook.get_conn() as ssh_client:
+            sftp_client = ssh_client.open_sftp()
+            ssh_client.exec_command(command=f"mkdir -p {target}")
+            with open(os.path.join(work_dir, image_id), 'rb') as r:
+                with sftp_client.open(os.path.join(target, image_id), 'wb') as f:
+                    shutil.copyfileobj(r.raw, f)
+
+        print('Removing local copy')
+        os.unlink(os.path.join(work_dir, image_id))
+
+    setup_task = PythonOperator(
+        python_callable=setup, task_id='setup_connection')
+    a_id = setup_task.output['return_value']
+
+    cleanup_task = PythonOperator(python_callable=remove, op_kwargs={
+                                  'conn_id': a_id}, task_id='cleanup')
+
+    setup_task >> im_download(connection_id=a_id) >> im_upload(
+        connection_id=a_id) >> cleanup_task
+
+
+dag = transfer_image_alt()
diff --git a/dags/just_reg.py b/dags/just_reg.py
index 827eef9a82cfe77a2c9a44d5ddfa5bc6bd9ffa2b..8018a652805b214c7b286a4903b0e55037a98571 100644
--- a/dags/just_reg.py
+++ b/dags/just_reg.py
@@ -85,12 +85,13 @@ def datacat_registration_example():
 
 
 
-    step1 = BashOperator(bash_command='ls', task_id='nothing')
+    step1 = BashOperator(bash_command='curl -X GET -k https://bscgrid20.bsc.es/image_creation/images/download/wordcount_skylake.sif -o /work/ww', task_id='nothing')
     step2 = register(
         object_url='https://b2share-testing.fz-juelich.de/records/7a12fda26b2a4d248f96d012d54769b7')
 
     step3 = get_template()
-    step1 >> step2 >> step3
+    stepr = BashOperator(bash_command='rm /work/ww', task_id='remov')
+    step1 >> step2 >> step3 >> stepr
 
 
 dag = datacat_registration_example()
diff --git a/dags/uploadflow.py b/dags/uploadflow.py
index 4ceacb99bef508f0028bdc98a30f8c8c661b6e2b..17c845bcb56bff1b3b5df3d6404cf4ac6f87431c 100644
--- a/dags/uploadflow.py
+++ b/dags/uploadflow.py
@@ -152,7 +152,7 @@ def upload_example():
 
     reg = register(object_url=uid)
 
-    setup_task >> files >> uid >> en
+    setup_task >> files >> uid >> reg >> en
 
 
 dag = upload_example()
diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml
index 9ec28f17d0282f6901466187fcdb40c74491c919..a9de206505b67aa9aa27108fff50029cb311c4bc 100644
--- a/dockers/docker-compose.yaml
+++ b/dockers/docker-compose.yaml
@@ -52,7 +52,7 @@ x-airflow-common:
     AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
     AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
     AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
-    AIRFLOW__CORE__FERNET_KEY: ''
+    AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW__CORE__FERNET_KEY}
     AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
     AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
     AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
diff --git a/docs/apirequests.adoc b/docs/apirequests.adoc
index d3276c76bb3e9c721bc085b9d5e6c720d371131e..3b5d9a6b8f14075ca580d7166e924fd9c2454a14 100644
--- a/docs/apirequests.adoc
+++ b/docs/apirequests.adoc
@@ -118,7 +118,7 @@ Optionally, the record created in b2share can be registered with data cat. This
 To transfer images from eFlows4HPC image build service use dag defined in +dags/image_transfer.py+ (transfer_image). It requires two parameters +image_id+ name of the image in the image
 build service (e.g. "wordcount_skylake.sif") and +target+ which defines a path on the system where the image will be transfered to. 
 
-The parameters should be passed along the credentials as described in <<credentials>>. The target directory will be created with ``mkdir -p`` on the target machine. The image is streamed directly to the target location (no local copy on DLS worker).
+The parameters should be passed along the credentials as described in <<credentials>>. The target directory will be created with ``mkdir -p`` on the target machine. The image is streamed directly to the target location (no local copy on DLS worker). By default the file on the target location will be overwritten, this can be disabled by providing +force='false'+ as dag parameter.  
 
 ----
 curl -X POST -u USER:PASS -H "Content-Type: application/json" \
diff --git a/scripts/deployment.sh b/scripts/deployment.sh
index 8f618dec9daa4c2211e80f8ee954e642e628f91b..71c4fab314a6166cdd591eee81867361d6c8214f 100755
--- a/scripts/deployment.sh
+++ b/scripts/deployment.sh
@@ -14,10 +14,15 @@ if [ -z ${2+x} ]; then echo "No user input for starting repository location. Def
 if [ -z ${3+x} ]; then export SERVER_DOMAIN=dls.fz-juelich.de; else export SERVER_DOMAIN=$3; fi
 if [ -z ${4+x} ]; then unset AIRFLOW__SECRETS__BACKEND; else export AIRFLOW__SECRETS__BACKEND=$4; fi
 if [ -z ${5+x} ]; then unset AIRFLOW__SECRETS__BACKEND_KWARGS; else export AIRFLOW__SECRETS__BACKEND_KWARGS=$5; fi
+if [ -z ${6+x} ]; then unset AIRFLOW__CORE__FERNET_KEY; else export AIRFLOW__CORE__FERNET_KEY=$6; fi
+
+
 
 echo "DEBUG values: OLD_DIR=$OLD_DIR, ENTRYPOINT_DIR=$ENTRYPOINT and GIT_REPO=$GIT_REPO"
 echo "DEBUG using secrets backend: $AIRFLOW__SECRETS__BACKEND"
 echo "DEBUG backend args length: ${#AIRFLOW__SECRETS__BACKEND_KWARGS}"
+echo "DEBUG fernet key: ${AIRFLOW__CORE__FERNET_KEY}"
+
 
 cd $ENTRYPOINT
 mkdir -p eflows-airflow