diff --git a/.gitignore b/.gitignore index 6bab01222a52bba92a48373a7755931e5779879a..382dac6d445b61c2891663e679c5efbce428a87c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ dockers/.env .vscode/* *.pyc +logs/ +.env # contains data for local tests .coverage diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f371db0ad9a35fdefe8ebe779814ac82dc70761d..83e7ecb3bb61a4b54ac274886365b635eac987eb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,5 +1,6 @@ default: image: python:3.9-slim + variables: OS_AUTH_TYPE: v3applicationcredential OS_AUTH_URL: https://hdf-cloud.fz-juelich.de:5000 @@ -11,6 +12,10 @@ variables: TESTING_NAME: airflow-testing TESTING_URL: http://134.94.199.220:7001/home AIRFLOW_TESTUSER: "airflow" + AIRFLOW__SECRETS__BACKEND_KWARGS: '{"url" : "https://zam10036.zam.kfa-juelich.de", "user" : "${DATACAT_TESTING_USERNAME}", "password" : "${DATACAT_TESTING_PASSWORD}"}' + AIRFLOW__SECRETS__BACKEND: datacat_integration.secrets.DatacatSecretsBackend + DOCKER_TLS_CERTDIR: "" + # before script copied from gitlab docs .before_script_template: &ssh_setup @@ -23,6 +28,8 @@ variables: stages: - test + - build + - publish - deploy - test-deployment - cleanup @@ -30,9 +37,11 @@ stages: test: stage: test image: - name: apache/airflow:2.1.3 + name: apache/airflow:2.2.3 entrypoint: [""] before_script: + - echo "DEBUG:" + - pip --version - airflow db init - pip install -r requirements.txt - pip install nose==1.3.7 @@ -46,10 +55,31 @@ test: - airflow dags test testdag 2021-08-18 - nosetests -full-deploy-testing: + +build-custom-image: + stage: build + image: docker:latest + services: + - docker:dind + when: manual + tags: + - laptop + variables: + IMAGE_COMMIT_TAG: $CI_REGISTRY_IMAGE/eflows-airflow:$CI_COMMIT_SHORT_SHA + IMAGE_LATEST_TAG: $CI_REGISTRY_IMAGE/eflows-airflow:latest + + script: + - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY + - docker build --no-cache=true --pull -t $IMAGE_COMMIT_TAG -f dockers/eflows-airflow.docker . + - docker push $IMAGE_COMMIT_TAG + - docker tag $IMAGE_COMMIT_TAG $IMAGE_LATEST_TAG + - docker push $IMAGE_LATEST_TAG + + +deploy-testing: stage: deploy environment: Testing - only: + only: - web script: - echo "Starting the full testing deployment of airflows example." @@ -61,7 +91,8 @@ full-deploy-testing: - openstack server add floating ip $INSTANCE_ID $TESTING_IP - echo "Done" -# TODO test light deployment +# NOTE Light deployment did not perform well when the template/main.html file was changed (in case of the official airflow image being updated) +# TODO Add proper tests light-deploy-testing: stage: deploy # only run when master is updated, unless the pipeline was triggered via the web UI @@ -77,7 +108,7 @@ light-deploy-testing: - ssh -oStrictHostKeyChecking=accept-new airflow@$TESTING_IP "cd /home/airflow/data-logistics-service && sudo git stash && sudo git pull --all && sudo git checkout -f $CI_COMMIT_TAG && sudo git stash clear" - ssh -oStrictHostKeyChecking=accept-new airflow@$TESTING_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service" -test-testing_dags: +test-testingdeployment_webserver: cache: {} stage: test-deployment only: @@ -130,4 +161,17 @@ cleanup-failed-full-deployment: openstack server add floating ip $OLD_TEST_ID $TESTING_IP; openstack server delete $REMOVE_ID && echo "Deleted faulty testing server."; fi - # gitlab should automatically alert the devs about this failure \ No newline at end of file + # gitlab should automatically alert the devs about this failure +publishgit-do: + stage: publish + only: + - tags + tags: [stable] + script: + - apt-get update + - apt-get install -y git + - (git remote rm gith) || echo "Not found" + - (git remote -v | grep gith) || git remote add gith "https://${GITHUB_USER}:${GITHUB_TOKEN}@github.com/eflows4hpc/data-logistics-service.git" + - git remote -v + - git fetch --unshallow origin + - git push gith +HEAD:refs/heads/main diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..7623188996d969af74a826e53105b4c0a070d25f --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +BSD 2-Clause License + +Copyright (c) 2021, Forschungszentrum Juelich GmbH +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/config/airflow.cfg b/config/airflow.cfg index b304e72a91e130ec0318b88dbad24cd096d8d6b2..e5dba33dc3655a56c33841e4935812ece961c238 100644 --- a/config/airflow.cfg +++ b/config/airflow.cfg @@ -353,7 +353,7 @@ backend = # See documentation for the secrets backend you are using. JSON is expected. # Example for AWS Systems Manager ParameterStore: # ``{"connections_prefix": "/airflow/connections", "profile_name": "default"}`` -backend_kwargs = +backend_kwargs = [cli] # In what way should the cli access the API. The LocalClient will use the diff --git a/dags/datacat_connection_sync.py b/dags/datacat_connection_sync.py new file mode 100644 index 0000000000000000000000000000000000000000..8a8f096dc29d94d333aef39b9c61432698f7fe44 --- /dev/null +++ b/dags/datacat_connection_sync.py @@ -0,0 +1,119 @@ + +from typing import Dict +from airflow.decorators import dag, task +from airflow.models.connection import Connection +from airflow.utils.dates import days_ago +from airflow import settings +import logging +from sqlalchemy.orm.session import Session as SASession +from datacat_integration.secrets import DataCatConnectionWithSecrets + +from datacat_integration.hooks import DataCatalogHook + +default_args = { + 'owner': 'airflow', +} + +connections_type = "airflow_connections" +substring_start = len(connections_type) + 1 +substring_end = substring_start + 36 # length of a UUID4 + +log = logging.getLogger(__name__) + +def get_conn_name(datacat_type: str, oid: str): + return "{}/{}-connection".format(datacat_type, oid) + +def get_normal_or_secret_property(key: str, props: Dict[str,str], secrets: Dict[str, str], default_value = None): + return props.get(key, secrets.get(key, default_value)) + + +def get_connection(hook: DataCatalogHook, datacat_type: str, oid: str): + conn_id = get_conn_name(datacat_type, oid) + secrets_connection = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password) + datacat_entry: Dict[str,str] = secrets_connection.get_object(datacat_type, oid)['metadata'] + datacat_entry_secrets = secrets_connection.get_all_secret_key_value(datacat_type, oid) + extra={} + predefined_keys = ['conn_type', 'description', 'host', 'login', 'password', 'schema', 'port'] + # build extra from non-predefined keys + for key in datacat_entry: + if key not in predefined_keys: + extra[key] = datacat_entry[key] + + for key in datacat_entry_secrets: + if key not in predefined_keys: + extra[key] = datacat_entry_secrets[key] + + + return Connection( + conn_id=conn_id, + conn_type=get_normal_or_secret_property('conn_type', datacat_entry, datacat_entry_secrets), + description=get_normal_or_secret_property('description', datacat_entry, datacat_entry_secrets, 'Automatically generated Connection from the datacatalog object {}/{}'.format(connections_type, oid)), + host=get_normal_or_secret_property('host', datacat_entry, datacat_entry_secrets), + login=get_normal_or_secret_property('login', datacat_entry, datacat_entry_secrets), + password=get_normal_or_secret_property('password', datacat_entry, datacat_entry_secrets), + schema=get_normal_or_secret_property('schema', datacat_entry, datacat_entry_secrets), + port=int(get_normal_or_secret_property('port', datacat_entry, datacat_entry_secrets)), + extra=extra + ) + + +@dag(default_args=default_args, schedule_interval='@hourly', start_date=days_ago(1), tags=['dls-service-dag']) +def sync_connections(): + + @task + def list_catalog_connections(**kwargs): + hook = DataCatalogHook("datacatalog") + objects = hook.list_type(connections_type) + oid_list = [element[1] for element in objects] + return oid_list + + @task + def list_airflow_connections(**kwargs): + session : SASession = settings.Session() + conns = session.query(Connection).filter(Connection.conn_id.like("{}/%-connection".format(connections_type))) + oid_list = [conn.conn_id[substring_start:substring_end] for conn in conns] + return oid_list + + @task + def get_add_list(catalog_connections, airflow_connections, **kwargs): + return list(set(catalog_connections).difference(airflow_connections)) + + @task + def get_remove_list(catalog_connections, airflow_connections, **kwargs): + return list(set(airflow_connections).difference(catalog_connections)) + + @task + def remove_connections(oid_list, **kwargs): + log.info("Going to remove from conections: " + ','.join(oid_list)) + session : SASession = settings.Session() + for oid in oid_list: + session.query(Connection).filter(Connection.conn_id == get_conn_name(connections_type, oid)).delete() + session.commit() + + @task + def add_connections(oid_list, **kwargs): + log.info("Going to add to conections: " + ','.join(oid_list)) + hook = DataCatalogHook("datacatalog") + connections = [] + for oid in oid_list: + connections.append(get_connection(hook, connections_type, oid)) + + session = settings.Session() + # no check for existsnce necessary, since it is handled by get_add_list() + for conn in connections: + session.add(conn) + + session.commit() + + cat_conn = list_catalog_connections() + air_conn = list_airflow_connections() + + add_list = get_add_list(cat_conn, air_conn) + remove_list = get_remove_list(cat_conn, air_conn) + + add_connections(add_list) + + remove_connections(remove_list) + + +dag = sync_connections() diff --git a/dags/test_secrets_backend.py b/dags/test_secrets_backend.py new file mode 100644 index 0000000000000000000000000000000000000000..f8409669f8b02486f0f49743df0876f560f452f4 --- /dev/null +++ b/dags/test_secrets_backend.py @@ -0,0 +1,21 @@ + +from airflow.decorators import dag, task +from airflow.utils.dates import days_ago +from airflow.hooks.base import BaseHook + +default_args = { + 'owner': 'airflow', +} + + +@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) +def test_secrets_backend(): + @task() + def get_print_and_return_conenction(): + conn = BaseHook.get_connection('860355e9-975f-4253-9421-1815e20c879b') + print(conn.get_extra()) + + get_print_and_return_conenction() + + +dag = test_secrets_backend() diff --git a/dags/uploadflow.py b/dags/uploadflow.py index 5f99de95e233c6ec0a486037ca44625fb6f70863..6df478109069c94f1958eddae7aee89821742bcb 100644 --- a/dags/uploadflow.py +++ b/dags/uploadflow.py @@ -2,15 +2,16 @@ import os import tempfile +from airflow import settings from airflow.decorators import dag, task from airflow.models.connection import Connection +from airflow.providers.http.hooks.http import HttpHook from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.dates import days_ago from b2shareoperator import (add_file, create_draft_record, get_record_template, submit_draft) - default_args = { 'owner': 'airflow', } @@ -18,12 +19,40 @@ default_args = { @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example']) def upload_example(): + @task + def setup(**kwargs): + print(f"Setting up the connection") + + params = kwargs['params'] + rrid = kwargs['run_id'] + host = params.get('host') + port = params.get('port', 2222) + user = params.get('login', 'eflows') + key = params.get('key') + + conn_id = f"tmp_connection_{rrid}" + extra = {"private_key": key} + conn = Connection( + conn_id=conn_id, + conn_type='ssh', + description='Automatically generated Connection', + host=host, + login=user, + port=port, + extra=extra + ) + + session = settings.Session() + session.add(conn) + session.commit() + print(f"Connection {conn_id} created") + return conn_id + @task() - def load(**kwargs): + def load(connection_id, **kwargs): params = kwargs['params'] target = params.get('target', '/tmp/') source = params.get('source', '/tmp/') - connection_id = params.get('connection', 'default_ssh') ssh_hook = SSHHook(ssh_conn_id=connection_id) with ssh_hook.get_conn() as ssh_client: @@ -51,9 +80,29 @@ def upload_example(): # hate such hacks: server = "https://" + connection.host token = connection.extra_dejson['access_token'] - print(f"Server: {server} + {token}") + - template = get_record_template() + params = kwargs['params'] + mid = params['mid'] + + hook = HttpHook(http_conn_id='datacat', method='GET') + hrespo = hook.run(endpoint=f"storage_target/{mid}").json()['metadata'] + print(hrespo) + + template = { + "titles" : [{"title":hrespo['title']}], + "creators" : [{"creator_name": hrespo['creator_name']}], + "descriptions" :[ + { + "description": hrespo['description'], + "description_type": "Abstract" + } + ], + "community": "a9217684-945b-4436-8632-cac271f894ed", + 'community_specific': + {'91ae5d2a-3848-4693-9f7d-cbd141172ef0': {'helmholtz centre': ['Forschungszentrum Jülich']}}, + "open_access": hrespo['open_access']=="True" + } r = create_draft_record(server=server, token=token, record=template) print(r) print(f"Draft record created {r['id']} --> {r['links']['self']}") @@ -65,9 +114,22 @@ def upload_example(): print("Submitting record for pubication") submitted = submit_draft(record=r, token=token) print(f"Record created {submitted['id']}") + return submitted['id'] + + @task() + def remove(conn_id, uid): + print(f"Upload {uid} completed. Removing conneciton {conn_id}") + session = settings.Session() + for con in session.query(Connection).all(): + print(con) + + session.query(Connection).filter(Connection.conn_id == conn_id).delete() + session.commit() - files = load() - upload(files) + conn_id = setup() + files = load(connection_id=conn_id) + uid = upload(files) + remove(conn_id=conn_id, uid=uid) dag = upload_example() diff --git a/dockers/docker-compose.yaml b/dockers/docker-compose.yaml index e5e483bf4b64d9d1604f5550cee63829df351510..19ab20482746b5f3f914c4540cbf27dcf85d82f6 100644 --- a/dockers/docker-compose.yaml +++ b/dockers/docker-compose.yaml @@ -56,6 +56,8 @@ x-airflow-common: AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' + AIRFLOW__SECRETS__BACKEND_KWARGS: ${AIRFLOW__SECRETS__BACKEND_KWARGS} + AIRFLOW__SECRETS__BACKEND: ${AIRFLOW__SECRETS__BACKEND} _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ./dags:/opt/airflow/dags diff --git a/dockers/eflows-airflow.docker b/dockers/eflows-airflow.docker new file mode 100644 index 0000000000000000000000000000000000000000..7aae52533bf7c8155acb76f1fac2eeb3f30ceb54 --- /dev/null +++ b/dockers/eflows-airflow.docker @@ -0,0 +1,9 @@ +FROM apache/airflow:2.2.3 + +USER root +RUN apt update && apt install git -y +USER airflow + +RUN pip --version +ADD requirements.txt /requirements.txt +RUN pip install -r /requirements.txt diff --git a/requirements.txt b/requirements.txt index aeecd2f8a68c7beca6f3259e5c3642f6349f8984..bc0dc1875165ec94b506d9b4a5525fd946feaea8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ requests -urllib3==1.26.6 +urllib3 plyvel apache-airflow-providers-ssh apache-airflow-providers-http apache-airflow-providers-sftp +--index-url https://gitlab.jsc.fz-juelich.de/api/v4/projects/4405/packages/pypi/simple +airflow-datacat-integration>=0.1.1 diff --git a/templates/NOTICE b/templates/NOTICE new file mode 100644 index 0000000000000000000000000000000000000000..fd9d4f0e10f48f142389028975d2c2764e0de55b --- /dev/null +++ b/templates/NOTICE @@ -0,0 +1,38 @@ +This product is licensed to you under the Apache License, Version 2.0 +(the "License"). You may not use this product except in compliance with +the License. + +This product may include a number of subcomponents with separate +copyright notices and license terms. Your use of the source code for +these subcomponents is subject to the terms and conditions of the +subcomponent's license, as noted in the license.txt file. + +The files in this folder have been modified by Forschungszentrum Juelich GmbH. + +======================================================================= + +BSD 2-Clause License + +Copyright (c) 2021, Forschungszentrum Juelich GmbH +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.