diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6fd6f204e71a104c8f151ce34c311807235bf6aa..3b978f3a3ad6b56f3fe322ae500325b755e145ff 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -14,6 +14,7 @@ variables: PRODUCTION_DOMAIN: datalogistics.eflows4hpc.eu AIRFLOW_TESTUSER: "airflow" AIRFLOW__SECRETS__BACKEND: datacat_integration.secrets.DatacatSecretsBackend + DAG_GIT_URL: https://github.com/eflows4hpc/dls-dags VOLUME_ID: 6b58c3a6-691b-496a-8afd-153637c2de48 DOCKER_TLS_CERTDIR: "" @@ -102,7 +103,7 @@ full-deploy-production: - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo mkdir -p /persistent_data && sudo mount /dev/vdb1 /persistent_data" - until ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP ls /finished_cloudinit >/dev/null 2>&1; do sleep 30; done # wait until cloudinit script is complete - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo service docker restart" # to use the configured docker data path - - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY" + - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY $DAG_GIT_URL" - echo "Done" # NOTE Light deployment did not perform well when the template/main.html file was changed (in case of the official airflow image being updated) @@ -119,7 +120,7 @@ light-deploy-production: environment: Production script: - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "cd /home/airflow/data-logistics-service && git stash && git stash clear && git checkout main && git checkout -f $CI_COMMIT_TAG && git pull --all" - - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS" $AIRFLOW_FERNET_KEY + - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY $DAG_GIT_URL" test-production-webserver: cache: {} diff --git a/dags/image_transfer.py b/dags/image_transfer.py index ba2da0642cbc4d2a517af7152e06943316ca7079..a517bb2bc29baa020898c1d6fe3942406aa3429f 100644 --- a/dags/image_transfer.py +++ b/dags/image_transfer.py @@ -4,6 +4,7 @@ import requests from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator +from dags.uploadflow import copy_streams from decors import setup, get_connection, remove @@ -21,6 +22,7 @@ def file_exist(sftp, name): @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def transfer_image(): + @task def stream_upload(connection_id, **kwargs): params = kwargs['params'] @@ -47,12 +49,7 @@ def transfer_image(): with requests.get(url, stream=True, verify=False) as r: with sftp_client.open(remote_name, 'wb') as f: f.set_pipelined(pipelined=True) - while True: - chunk=r.raw.read(1024 * 1000) - if not chunk: - break - content_to_write = memoryview(chunk) - f.write(content_to_write) + copy_streams(input=r, output=f) setup_task = PythonOperator( python_callable=setup, task_id='setup_connection') diff --git a/dags/uploadflow.py b/dags/uploadflow.py index 72c4d7e0d1f0f1737c27bd94d8543c4cf94c5c5c..e2b04a2563ee9d5e909ae47788eabde6271b4f0c 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -38,6 +38,43 @@ def create_template(hrespo): "open_access": hrespo['open_access'] == "True" } +def copy_streams(input, output, chunk_size = 1024 * 1000): + while True: + chunk=input.raw.read(chunk_size) + if not chunk: + break + content_to_write = memoryview(chunk) + output.write(content_to_write) + + +def ssh_download(sftp_client, remote, local): + #sftp_client.get(remote, local) + with sftp_client.open(remote, 'rb') as input: + with open(local, 'wb') as output: + input.set_pipelined(pipelined=True) + copy_streams(input=input, output=output) + + +def ssh2local_copy(ssh_hook, source: str, target: str): + with ssh_hook.get_conn() as ssh_client: + sftp_client = ssh_client.open_sftp() + lst = sftp_client.listdir(path=source) + + print(f"{len(lst)} objects in {source}") + mappings = dict() + for fname in lst: + local = tempfile.mktemp(prefix='dls', dir=target) + full_name = os.path.join(source, fname) + sts = sftp_client.stat(full_name) + if str(sts).startswith('d'): + print(f"{full_name} is a directory. Skipping") + continue + + print(f"Copying {full_name} --> {local}") + ssh_download(sftp_client=sftp_client, remote=full_name, local=local) + mappings[local] = fname + + return mappings @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def upload_example(): @@ -47,24 +84,9 @@ def upload_example(): params = kwargs['params'] target = Variable.get("working_dir", default_var='/tmp/') source = params.get('source', '/tmp/') - ssh_hook = get_connection(conn_id=connection_id, **kwargs) - with ssh_hook.get_conn() as ssh_client: - sftp_client = ssh_client.open_sftp() - lst = sftp_client.listdir(path=source) - mappings = dict() - for fname in lst: - local = tempfile.mktemp(prefix='dls', dir=target) - full_name = os.path.join(source, fname) - sts = sftp_client.stat(full_name) - if str(sts).startswith('d'): - print(f"{full_name} is a directory. Skipping") - continue - - print(f"Copying {connection_id}:{full_name} --> {local}") - sftp_client.get(os.path.join(source, fname), local) - mappings[local] = fname + mappings = ssh2local_copy(ssh_hook=ssh_hook, source=source, target=target) return mappings @task() @@ -135,7 +157,7 @@ def upload_example(): except ConnectionError as e: print('Registration failed', e) return -1 - + setup_task = PythonOperator(python_callable=setup, task_id='setup_connection') a_id = setup_task.output['return_value'] @@ -146,7 +168,7 @@ def upload_example(): 'conn_id': a_id}, task_id='cleanup') reg = register(object_url=uid) - + setup_task >> files >> uid >> reg >> en diff --git a/dags/webdav_example.py b/dags/webdav_example.py new file mode 100644 index 0000000000000000000000000000000000000000..9829bfda9a33f7fbd2533763d3a934afe2310313 --- /dev/null +++ b/dags/webdav_example.py @@ -0,0 +1,74 @@ +import os + +from airflow.decorators import dag, task +from airflow.models import Variable +from airflow.models.connection import Connection +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from webdav3.client import Client + +from uploadflow import ssh2local_copy +from decors import get_connection, remove, setup + +default_args = { + 'owner': 'airflow', +} + + +@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) +def webdav_upload(): + + @task() + def download(connection_id, **kwargs): + + params = kwargs['params'] + target = Variable.get("working_dir", default_var='/tmp/') + source = params.get('source', '/tmp/') + ssh_hook = get_connection(conn_id=connection_id, **kwargs) + + mappings = ssh2local_copy(ssh_hook=ssh_hook, source=source, target=target) + + return mappings + + @task() + def load(mappings, **kwargs): + params = kwargs['params'] + target = params.get('target', '/airflow-test') + connection = Connection.get_connection_from_secrets('b2drop_webdav') + options = {'webdav_hostname': f"https://{connection.host}{connection.schema}", + 'webdav_login': connection.login, + 'webdav_password': connection.get_password() + } + print(f"Translated http to webdav: {options}") + client = Client(options) + res = client.mkdir(target) + print(f"Creating {target}: {'ok' if res else 'failed'}") + + print(f"Starting upload -> {target}") + for [local, true_name] in mappings.items(): + full_name = full_name = os.path.join(target, true_name) + print(f"Processing {local} --> {full_name}") + client.upload_sync(remote_path=full_name, local_path=local) + + # delete local + os.unlink(local) + + return True + + @task + def print_stats(res): + print('Finished') + + setup_task = PythonOperator( + python_callable=setup, task_id='setup_connection') + a_id = setup_task.output['return_value'] + + mappings = download(connection_id=a_id) + res = load(mappings=mappings) + + en = PythonOperator(python_callable=remove, op_kwargs={ + 'conn_id': a_id}, task_id='cleanup') + res >> en + + +dag = webdav_upload() diff --git a/requirements.txt b/requirements.txt index d0b08be1b6301433865d9c782fa87d403f20991a..cb40ab776ed5a9aa04d12e6d3851f0e5dfbf682d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ requests urllib3 plyvel +webdavclient3 apache-airflow-providers-ssh apache-airflow-providers-http apache-airflow-providers-sftp --index-url https://gitlab.jsc.fz-juelich.de/api/v4/projects/4405/packages/pypi/simple -airflow-datacat-integration>=0.1.3 +airflow-datacat-integration>=0.1.4 diff --git a/scripts/cloudinit.yml b/scripts/cloudinit.yml index 65e09ede421ec01f6974fcd9c59a04a80b19be56..010991b03b61d40cbfd8a9a402d76b146d914e37 100644 --- a/scripts/cloudinit.yml +++ b/scripts/cloudinit.yml @@ -69,6 +69,7 @@ runcmd: - sudo -u airflow git clone https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service.git ./data-logistics-service - cd ./data-logistics-service - crontab -l | { cat ; echo '@daily root find /persistent_data/logs -mtime +13 -type f -delete'; } | crontab - # setup log clearing crontab + - crontab -l | { cat ; echo '*/5 * * * * cd /home/airflow/eflows-airflow/dags && sudo git pull >/dev/null 2>&1'; } | crontab - # setup dag crontab - touch /finished_cloudinit final_message: "The system is finally up, after $UPTIME seconds" \ No newline at end of file diff --git a/scripts/deployment.sh b/scripts/deployment.sh index 5f0930aef687d199ebde04a67a9ff2f28ffd0a3a..78bf11ac89e4b9f79522cc5f408c731a48cb0c39 100755 --- a/scripts/deployment.sh +++ b/scripts/deployment.sh @@ -2,7 +2,7 @@ # @author Maria Petrova & Christian Böttcher ## USAGE: # -# deployment.sh <user_home_directory> <git_directory> [SERVER_DOMAIN] [AIRFLOW__SECRETS__BACKEND] [AIRFLOW__SECRETS__BACKEND_KWARGS] +# deployment.sh <user_home_directory> <git_directory> [SERVER_DOMAIN] [AIRFLOW__SECRETS__BACKEND] [AIRFLOW__SECRETS__BACKEND_KWARGS] [AIRFLOW__CORE__FERNET_KEY] [DAG_GIT_URL] OLD_DIR=`pwd` GIT_REPO=$HOME/data-logistics-service @@ -15,6 +15,7 @@ if [ -z ${3+x} ]; then export SERVER_DOMAIN=dls.fz-juelich.de; else export SERVE if [ -z ${4+x} ]; then unset AIRFLOW__SECRETS__BACKEND; else export AIRFLOW__SECRETS__BACKEND=$4; fi if [ -z ${5+x} ]; then unset AIRFLOW__SECRETS__BACKEND_KWARGS; else export AIRFLOW__SECRETS__BACKEND_KWARGS=$5; fi if [ -z ${6+x} ]; then unset AIRFLOW__CORE__FERNET_KEY; else export AIRFLOW__CORE__FERNET_KEY=$6; fi +if [ -z ${6+x} ]; then unset DAG_GIT_URL; else export DAG_GIT_URL=$7; fi @@ -22,6 +23,7 @@ echo "DEBUG values: OLD_DIR=$OLD_DIR, ENTRYPOINT_DIR=$ENTRYPOINT and GIT_REPO=$G echo "DEBUG using secrets backend: $AIRFLOW__SECRETS__BACKEND" echo "DEBUG backend args length: ${#AIRFLOW__SECRETS__BACKEND_KWARGS}" #echo "DEBUG fernet key: ${AIRFLOW__CORE__FERNET_KEY}" +echo "DEBUG DAG git dir: $DAG_GIT_URL" cd $ENTRYPOINT @@ -38,7 +40,7 @@ echo "Proceeding as user $(whoami)" # Make the necessary folders for the airflow artefacts and copy the corresponging content mkdir -p ./dags ./logs ./plugins ./config ./templates cd $GIT_REPO -cp -r dags/* $AIRFLOW_DIR/dags +rm -rf $AIRFLOW_DIR/dags/* && rm -rf $AIRFLOW_DIR/dags/.git && git clone $DAG_GIT_URL $AIRFLOW_DIR/dags cp -r plugins/* $AIRFLOW_DIR/plugins cp config/* $AIRFLOW_DIR/config/ cp -r templates/* $AIRFLOW_DIR/templates diff --git a/tests/test_ssh.py b/tests/test_ssh.py new file mode 100644 index 0000000000000000000000000000000000000000..ac2ad3664e9b128c8ee8d4bf963c909b189b28a3 --- /dev/null +++ b/tests/test_ssh.py @@ -0,0 +1,74 @@ +import tempfile +import unittest +from unittest.mock import MagicMock, patch +import os + +from dags.uploadflow import ssh2local_copy, copy_streams + +class TestSSH(unittest.TestCase): + + @patch('dags.uploadflow.tempfile.mktemp') + def test_copy_files(self, tmp): + tmp.side_effect = ['tmpA', 'tmpB'] + + my_hook = MagicMock() + a = MagicMock() + a.return_value = ['a', 'c'] + stat = MagicMock(side_effect=['elo', 'elo']) + cpy = MagicMock(return_value=False) + my_hook.get_conn().__enter__().open_sftp().listdir = a + my_hook.get_conn().__enter__().open_sftp().stat = stat + my_hook.get_conn().__enter__().open_sftp().open().__enter__().raw.read = cpy + + mapps = ssh2local_copy(ssh_hook=my_hook, source='srcZ', target='trg') + my_hook.get_conn.assert_any_call() + a.assert_called_once_with(path='srcZ') + cpy.assert_called() + + print(mapps) + self.assertEqual(len(mapps), 2) + + + @patch('dags.uploadflow.tempfile.mktemp') + def test_skipdir_files(self, tmp): + tmp.side_effect = ['tmpA', 'tmpB'] + + my_hook = MagicMock() + a = MagicMock() + a.return_value = ['a', 'c'] + stat = MagicMock(side_effect=['elo', 'd elo']) + cpy = MagicMock(return_value=False) + my_hook.get_conn().__enter__().open_sftp().listdir = a + my_hook.get_conn().__enter__().open_sftp().stat = stat + my_hook.get_conn().__enter__().open_sftp().open().__enter__().raw.read = cpy + + mapps = ssh2local_copy(ssh_hook=my_hook, source='srcZ', target='trg') + my_hook.get_conn.assert_any_call() + a.assert_called_once_with(path='srcZ') + cpy.assert_called() + + print(mapps) + + self.assertEqual(len(mapps), 1) + + + def test_copy_streams(self): + """ + def copy_streams(input, output): + """ + with tempfile.TemporaryDirectory() as dir: + text = 'Some input text' + input_name = os.path.join(dir,'input.txt') + output_name = os.path.join(dir, 'output') + with open(input_name, 'w') as fln: + fln.write(text) + + with open(input_name, 'rb') as input: + with open(output_name, 'wb') as output: + copy_streams(input=input, output=output) + + with open(output_name, 'r') as f: + txt = f.read() + print("Read following: ", txt) + + self.assertEqual(text, txt)