Skip to content
Snippets Groups Projects
Commit 57f64cf8 authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Merge branch 'main' of gitlab.jsc.fz-juelich.de:eflows4hpc-wp2/data-logistics-service

parents 25367313 4fb72732
No related branches found
No related tags found
No related merge requests found
Pipeline #107364 passed
...@@ -14,6 +14,7 @@ variables: ...@@ -14,6 +14,7 @@ variables:
PRODUCTION_DOMAIN: datalogistics.eflows4hpc.eu PRODUCTION_DOMAIN: datalogistics.eflows4hpc.eu
AIRFLOW_TESTUSER: "airflow" AIRFLOW_TESTUSER: "airflow"
AIRFLOW__SECRETS__BACKEND: datacat_integration.secrets.DatacatSecretsBackend AIRFLOW__SECRETS__BACKEND: datacat_integration.secrets.DatacatSecretsBackend
DAG_GIT_URL: https://github.com/eflows4hpc/dls-dags
VOLUME_ID: 6b58c3a6-691b-496a-8afd-153637c2de48 VOLUME_ID: 6b58c3a6-691b-496a-8afd-153637c2de48
DOCKER_TLS_CERTDIR: "" DOCKER_TLS_CERTDIR: ""
...@@ -102,7 +103,7 @@ full-deploy-production: ...@@ -102,7 +103,7 @@ full-deploy-production:
- ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo mkdir -p /persistent_data && sudo mount /dev/vdb1 /persistent_data" - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo mkdir -p /persistent_data && sudo mount /dev/vdb1 /persistent_data"
- until ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP ls /finished_cloudinit >/dev/null 2>&1; do sleep 30; done # wait until cloudinit script is complete - until ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP ls /finished_cloudinit >/dev/null 2>&1; do sleep 30; done # wait until cloudinit script is complete
- ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo service docker restart" # to use the configured docker data path - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo service docker restart" # to use the configured docker data path
- ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY" - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY $DAG_GIT_URL"
- echo "Done" - echo "Done"
# NOTE Light deployment did not perform well when the template/main.html file was changed (in case of the official airflow image being updated) # NOTE Light deployment did not perform well when the template/main.html file was changed (in case of the official airflow image being updated)
...@@ -119,7 +120,7 @@ light-deploy-production: ...@@ -119,7 +120,7 @@ light-deploy-production:
environment: Production environment: Production
script: script:
- ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "cd /home/airflow/data-logistics-service && git stash && git stash clear && git checkout main && git checkout -f $CI_COMMIT_TAG && git pull --all" - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "cd /home/airflow/data-logistics-service && git stash && git stash clear && git checkout main && git checkout -f $CI_COMMIT_TAG && git pull --all"
- ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS" $AIRFLOW_FERNET_KEY - ssh -oStrictHostKeyChecking=accept-new airflow@$PRODUCTION_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service $PRODUCTION_DOMAIN $AIRFLOW__SECRETS__BACKEND $AIRFLOW__SECRETS__BACKEND_KWARGS $AIRFLOW_FERNET_KEY $DAG_GIT_URL"
test-production-webserver: test-production-webserver:
cache: {} cache: {}
......
...@@ -4,6 +4,7 @@ import requests ...@@ -4,6 +4,7 @@ import requests
from airflow.decorators import dag, task from airflow.decorators import dag, task
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
from dags.uploadflow import copy_streams
from decors import setup, get_connection, remove from decors import setup, get_connection, remove
...@@ -21,6 +22,7 @@ def file_exist(sftp, name): ...@@ -21,6 +22,7 @@ def file_exist(sftp, name):
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def transfer_image(): def transfer_image():
@task @task
def stream_upload(connection_id, **kwargs): def stream_upload(connection_id, **kwargs):
params = kwargs['params'] params = kwargs['params']
...@@ -47,12 +49,7 @@ def transfer_image(): ...@@ -47,12 +49,7 @@ def transfer_image():
with requests.get(url, stream=True, verify=False) as r: with requests.get(url, stream=True, verify=False) as r:
with sftp_client.open(remote_name, 'wb') as f: with sftp_client.open(remote_name, 'wb') as f:
f.set_pipelined(pipelined=True) f.set_pipelined(pipelined=True)
while True: copy_streams(input=r, output=f)
chunk=r.raw.read(1024 * 1000)
if not chunk:
break
content_to_write = memoryview(chunk)
f.write(content_to_write)
setup_task = PythonOperator( setup_task = PythonOperator(
python_callable=setup, task_id='setup_connection') python_callable=setup, task_id='setup_connection')
......
...@@ -38,20 +38,29 @@ def create_template(hrespo): ...@@ -38,20 +38,29 @@ def create_template(hrespo):
"open_access": hrespo['open_access'] == "True" "open_access": hrespo['open_access'] == "True"
} }
def copy_streams(input, output, chunk_size = 1024 * 1000):
while True:
chunk=input.raw.read(chunk_size)
if not chunk:
break
content_to_write = memoryview(chunk)
output.write(content_to_write)
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def upload_example():
@task() def ssh_download(sftp_client, remote, local):
def load(connection_id, **kwargs): #sftp_client.get(remote, local)
params = kwargs['params'] with sftp_client.open(remote, 'rb') as input:
target = Variable.get("working_dir", default_var='/tmp/') with open(local, 'wb') as output:
source = params.get('source', '/tmp/') input.set_pipelined(pipelined=True)
copy_streams(input=input, output=output)
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
def ssh2local_copy(ssh_hook, source: str, target: str):
with ssh_hook.get_conn() as ssh_client: with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp() sftp_client = ssh_client.open_sftp()
lst = sftp_client.listdir(path=source) lst = sftp_client.listdir(path=source)
print(f"{len(lst)} objects in {source}")
mappings = dict() mappings = dict()
for fname in lst: for fname in lst:
local = tempfile.mktemp(prefix='dls', dir=target) local = tempfile.mktemp(prefix='dls', dir=target)
...@@ -61,12 +70,25 @@ def upload_example(): ...@@ -61,12 +70,25 @@ def upload_example():
print(f"{full_name} is a directory. Skipping") print(f"{full_name} is a directory. Skipping")
continue continue
print(f"Copying {connection_id}:{full_name} --> {local}") print(f"Copying {full_name} --> {local}")
sftp_client.get(os.path.join(source, fname), local) ssh_download(sftp_client=sftp_client, remote=full_name, local=local)
mappings[local] = fname mappings[local] = fname
return mappings return mappings
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def upload_example():
@task()
def load(connection_id, **kwargs):
params = kwargs['params']
target = Variable.get("working_dir", default_var='/tmp/')
source = params.get('source', '/tmp/')
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
mappings = ssh2local_copy(ssh_hook=ssh_hook, source=source, target=target)
return mappings
@task() @task()
def upload(files: dict, **kwargs): def upload(files: dict, **kwargs):
connection = Connection.get_connection_from_secrets('default_b2share') connection = Connection.get_connection_from_secrets('default_b2share')
......
import os
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from webdav3.client import Client
from uploadflow import ssh2local_copy
from decors import get_connection, remove, setup
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def webdav_upload():
@task()
def download(connection_id, **kwargs):
params = kwargs['params']
target = Variable.get("working_dir", default_var='/tmp/')
source = params.get('source', '/tmp/')
ssh_hook = get_connection(conn_id=connection_id, **kwargs)
mappings = ssh2local_copy(ssh_hook=ssh_hook, source=source, target=target)
return mappings
@task()
def load(mappings, **kwargs):
params = kwargs['params']
target = params.get('target', '/airflow-test')
connection = Connection.get_connection_from_secrets('b2drop_webdav')
options = {'webdav_hostname': f"https://{connection.host}{connection.schema}",
'webdav_login': connection.login,
'webdav_password': connection.get_password()
}
print(f"Translated http to webdav: {options}")
client = Client(options)
res = client.mkdir(target)
print(f"Creating {target}: {'ok' if res else 'failed'}")
print(f"Starting upload -> {target}")
for [local, true_name] in mappings.items():
full_name = full_name = os.path.join(target, true_name)
print(f"Processing {local} --> {full_name}")
client.upload_sync(remote_path=full_name, local_path=local)
# delete local
os.unlink(local)
return True
@task
def print_stats(res):
print('Finished')
setup_task = PythonOperator(
python_callable=setup, task_id='setup_connection')
a_id = setup_task.output['return_value']
mappings = download(connection_id=a_id)
res = load(mappings=mappings)
en = PythonOperator(python_callable=remove, op_kwargs={
'conn_id': a_id}, task_id='cleanup')
res >> en
dag = webdav_upload()
requests requests
urllib3 urllib3
plyvel plyvel
webdavclient3
apache-airflow-providers-ssh apache-airflow-providers-ssh
apache-airflow-providers-http apache-airflow-providers-http
apache-airflow-providers-sftp apache-airflow-providers-sftp
--index-url https://gitlab.jsc.fz-juelich.de/api/v4/projects/4405/packages/pypi/simple --index-url https://gitlab.jsc.fz-juelich.de/api/v4/projects/4405/packages/pypi/simple
airflow-datacat-integration>=0.1.3 airflow-datacat-integration>=0.1.4
...@@ -69,6 +69,7 @@ runcmd: ...@@ -69,6 +69,7 @@ runcmd:
- sudo -u airflow git clone https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service.git ./data-logistics-service - sudo -u airflow git clone https://gitlab.jsc.fz-juelich.de/eflows4hpc-wp2/data-logistics-service.git ./data-logistics-service
- cd ./data-logistics-service - cd ./data-logistics-service
- crontab -l | { cat ; echo '@daily root find /persistent_data/logs -mtime +13 -type f -delete'; } | crontab - # setup log clearing crontab - crontab -l | { cat ; echo '@daily root find /persistent_data/logs -mtime +13 -type f -delete'; } | crontab - # setup log clearing crontab
- crontab -l | { cat ; echo '*/5 * * * * cd /home/airflow/eflows-airflow/dags && sudo git pull >/dev/null 2>&1'; } | crontab - # setup dag crontab
- touch /finished_cloudinit - touch /finished_cloudinit
final_message: "The system is finally up, after $UPTIME seconds" final_message: "The system is finally up, after $UPTIME seconds"
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# @author Maria Petrova & Christian Böttcher # @author Maria Petrova & Christian Böttcher
## USAGE: ## USAGE:
# #
# deployment.sh <user_home_directory> <git_directory> [SERVER_DOMAIN] [AIRFLOW__SECRETS__BACKEND] [AIRFLOW__SECRETS__BACKEND_KWARGS] # deployment.sh <user_home_directory> <git_directory> [SERVER_DOMAIN] [AIRFLOW__SECRETS__BACKEND] [AIRFLOW__SECRETS__BACKEND_KWARGS] [AIRFLOW__CORE__FERNET_KEY] [DAG_GIT_URL]
OLD_DIR=`pwd` OLD_DIR=`pwd`
GIT_REPO=$HOME/data-logistics-service GIT_REPO=$HOME/data-logistics-service
...@@ -15,6 +15,7 @@ if [ -z ${3+x} ]; then export SERVER_DOMAIN=dls.fz-juelich.de; else export SERVE ...@@ -15,6 +15,7 @@ if [ -z ${3+x} ]; then export SERVER_DOMAIN=dls.fz-juelich.de; else export SERVE
if [ -z ${4+x} ]; then unset AIRFLOW__SECRETS__BACKEND; else export AIRFLOW__SECRETS__BACKEND=$4; fi if [ -z ${4+x} ]; then unset AIRFLOW__SECRETS__BACKEND; else export AIRFLOW__SECRETS__BACKEND=$4; fi
if [ -z ${5+x} ]; then unset AIRFLOW__SECRETS__BACKEND_KWARGS; else export AIRFLOW__SECRETS__BACKEND_KWARGS=$5; fi if [ -z ${5+x} ]; then unset AIRFLOW__SECRETS__BACKEND_KWARGS; else export AIRFLOW__SECRETS__BACKEND_KWARGS=$5; fi
if [ -z ${6+x} ]; then unset AIRFLOW__CORE__FERNET_KEY; else export AIRFLOW__CORE__FERNET_KEY=$6; fi if [ -z ${6+x} ]; then unset AIRFLOW__CORE__FERNET_KEY; else export AIRFLOW__CORE__FERNET_KEY=$6; fi
if [ -z ${6+x} ]; then unset DAG_GIT_URL; else export DAG_GIT_URL=$7; fi
...@@ -22,6 +23,7 @@ echo "DEBUG values: OLD_DIR=$OLD_DIR, ENTRYPOINT_DIR=$ENTRYPOINT and GIT_REPO=$G ...@@ -22,6 +23,7 @@ echo "DEBUG values: OLD_DIR=$OLD_DIR, ENTRYPOINT_DIR=$ENTRYPOINT and GIT_REPO=$G
echo "DEBUG using secrets backend: $AIRFLOW__SECRETS__BACKEND" echo "DEBUG using secrets backend: $AIRFLOW__SECRETS__BACKEND"
echo "DEBUG backend args length: ${#AIRFLOW__SECRETS__BACKEND_KWARGS}" echo "DEBUG backend args length: ${#AIRFLOW__SECRETS__BACKEND_KWARGS}"
#echo "DEBUG fernet key: ${AIRFLOW__CORE__FERNET_KEY}" #echo "DEBUG fernet key: ${AIRFLOW__CORE__FERNET_KEY}"
echo "DEBUG DAG git dir: $DAG_GIT_URL"
cd $ENTRYPOINT cd $ENTRYPOINT
...@@ -38,7 +40,7 @@ echo "Proceeding as user $(whoami)" ...@@ -38,7 +40,7 @@ echo "Proceeding as user $(whoami)"
# Make the necessary folders for the airflow artefacts and copy the corresponging content # Make the necessary folders for the airflow artefacts and copy the corresponging content
mkdir -p ./dags ./logs ./plugins ./config ./templates mkdir -p ./dags ./logs ./plugins ./config ./templates
cd $GIT_REPO cd $GIT_REPO
cp -r dags/* $AIRFLOW_DIR/dags rm -rf $AIRFLOW_DIR/dags/* && rm -rf $AIRFLOW_DIR/dags/.git && git clone $DAG_GIT_URL $AIRFLOW_DIR/dags
cp -r plugins/* $AIRFLOW_DIR/plugins cp -r plugins/* $AIRFLOW_DIR/plugins
cp config/* $AIRFLOW_DIR/config/ cp config/* $AIRFLOW_DIR/config/
cp -r templates/* $AIRFLOW_DIR/templates cp -r templates/* $AIRFLOW_DIR/templates
......
import tempfile
import unittest
from unittest.mock import MagicMock, patch
import os
from dags.uploadflow import ssh2local_copy, copy_streams
class TestSSH(unittest.TestCase):
@patch('dags.uploadflow.tempfile.mktemp')
def test_copy_files(self, tmp):
tmp.side_effect = ['tmpA', 'tmpB']
my_hook = MagicMock()
a = MagicMock()
a.return_value = ['a', 'c']
stat = MagicMock(side_effect=['elo', 'elo'])
cpy = MagicMock(return_value=False)
my_hook.get_conn().__enter__().open_sftp().listdir = a
my_hook.get_conn().__enter__().open_sftp().stat = stat
my_hook.get_conn().__enter__().open_sftp().open().__enter__().raw.read = cpy
mapps = ssh2local_copy(ssh_hook=my_hook, source='srcZ', target='trg')
my_hook.get_conn.assert_any_call()
a.assert_called_once_with(path='srcZ')
cpy.assert_called()
print(mapps)
self.assertEqual(len(mapps), 2)
@patch('dags.uploadflow.tempfile.mktemp')
def test_skipdir_files(self, tmp):
tmp.side_effect = ['tmpA', 'tmpB']
my_hook = MagicMock()
a = MagicMock()
a.return_value = ['a', 'c']
stat = MagicMock(side_effect=['elo', 'd elo'])
cpy = MagicMock(return_value=False)
my_hook.get_conn().__enter__().open_sftp().listdir = a
my_hook.get_conn().__enter__().open_sftp().stat = stat
my_hook.get_conn().__enter__().open_sftp().open().__enter__().raw.read = cpy
mapps = ssh2local_copy(ssh_hook=my_hook, source='srcZ', target='trg')
my_hook.get_conn.assert_any_call()
a.assert_called_once_with(path='srcZ')
cpy.assert_called()
print(mapps)
self.assertEqual(len(mapps), 1)
def test_copy_streams(self):
"""
def copy_streams(input, output):
"""
with tempfile.TemporaryDirectory() as dir:
text = 'Some input text'
input_name = os.path.join(dir,'input.txt')
output_name = os.path.join(dir, 'output')
with open(input_name, 'w') as fln:
fln.write(text)
with open(input_name, 'rb') as input:
with open(output_name, 'wb') as output:
copy_streams(input=input, output=output)
with open(output_name, 'r') as f:
txt = f.read()
print("Read following: ", txt)
self.assertEqual(text, txt)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment