Skip to content
Snippets Groups Projects
Commit 7876b542 authored by Maria Petrova-El Sayed's avatar Maria Petrova-El Sayed
Browse files

Merged branch main (inlcuding datacat_integration) into mptest. CI/CD updated

parents aa8b6560 9d966404
Branches
Tags
No related merge requests found
Pipeline #88733 failed
......@@ -2,6 +2,8 @@
dockers/.env
.vscode/*
*.pyc
logs/
.env
# contains data for local tests
.coverage
default:
image: python:3.9-slim
variables:
OS_AUTH_TYPE: v3applicationcredential
OS_AUTH_URL: https://hdf-cloud.fz-juelich.de:5000
......@@ -11,6 +12,10 @@ variables:
TESTING_NAME: airflow-testing
TESTING_URL: http://134.94.199.220:7001/home
AIRFLOW_TESTUSER: "airflow"
AIRFLOW__SECRETS__BACKEND_KWARGS: '{"url" : "https://zam10036.zam.kfa-juelich.de", "user" : "${DATACAT_TESTING_USERNAME}", "password" : "${DATACAT_TESTING_PASSWORD}"}'
AIRFLOW__SECRETS__BACKEND: datacat_integration.secrets.DatacatSecretsBackend
DOCKER_TLS_CERTDIR: ""
# before script copied from gitlab docs
.before_script_template: &ssh_setup
......@@ -23,6 +28,8 @@ variables:
stages:
- test
- build
- publish
- deploy
- test-deployment
- cleanup
......@@ -30,9 +37,11 @@ stages:
test:
stage: test
image:
name: apache/airflow:2.1.3
name: apache/airflow:2.2.3
entrypoint: [""]
before_script:
- echo "DEBUG:"
- pip --version
- airflow db init
- pip install -r requirements.txt
- pip install nose==1.3.7
......@@ -46,7 +55,28 @@ test:
- airflow dags test testdag 2021-08-18
- nosetests
full-deploy-testing:
build-custom-image:
stage: build
image: docker:latest
services:
- docker:dind
when: manual
tags:
- laptop
variables:
IMAGE_COMMIT_TAG: $CI_REGISTRY_IMAGE/eflows-airflow:$CI_COMMIT_SHORT_SHA
IMAGE_LATEST_TAG: $CI_REGISTRY_IMAGE/eflows-airflow:latest
script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker build --no-cache=true --pull -t $IMAGE_COMMIT_TAG -f dockers/eflows-airflow.docker .
- docker push $IMAGE_COMMIT_TAG
- docker tag $IMAGE_COMMIT_TAG $IMAGE_LATEST_TAG
- docker push $IMAGE_LATEST_TAG
deploy-testing:
stage: deploy
environment: Testing
only:
......@@ -61,7 +91,8 @@ full-deploy-testing:
- openstack server add floating ip $INSTANCE_ID $TESTING_IP
- echo "Done"
# TODO test light deployment
# NOTE Light deployment did not perform well when the template/main.html file was changed (in case of the official airflow image being updated)
# TODO Add proper tests
light-deploy-testing:
stage: deploy
# only run when master is updated, unless the pipeline was triggered via the web UI
......@@ -77,7 +108,7 @@ light-deploy-testing:
- ssh -oStrictHostKeyChecking=accept-new airflow@$TESTING_IP "cd /home/airflow/data-logistics-service && sudo git stash && sudo git pull --all && sudo git checkout -f $CI_COMMIT_TAG && sudo git stash clear"
- ssh -oStrictHostKeyChecking=accept-new airflow@$TESTING_IP "sudo /home/airflow/data-logistics-service/scripts/deployment.sh /home/airflow /home/airflow/data-logistics-service"
test-testing_dags:
test-testingdeployment_webserver:
cache: {}
stage: test-deployment
only:
......@@ -131,3 +162,16 @@ cleanup-failed-full-deployment:
openstack server delete $REMOVE_ID && echo "Deleted faulty testing server.";
fi
# gitlab should automatically alert the devs about this failure
publishgit-do:
stage: publish
only:
- tags
tags: [stable]
script:
- apt-get update
- apt-get install -y git
- (git remote rm gith) || echo "Not found"
- (git remote -v | grep gith) || git remote add gith "https://${GITHUB_USER}:${GITHUB_TOKEN}@github.com/eflows4hpc/data-logistics-service.git"
- git remote -v
- git fetch --unshallow origin
- git push gith +HEAD:refs/heads/main
LICENSE 0 → 100644
BSD 2-Clause License
Copyright (c) 2021, Forschungszentrum Juelich GmbH
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from typing import Dict
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.utils.dates import days_ago
from airflow import settings
import logging
from sqlalchemy.orm.session import Session as SASession
from datacat_integration.secrets import DataCatConnectionWithSecrets
from datacat_integration.hooks import DataCatalogHook
default_args = {
'owner': 'airflow',
}
connections_type = "airflow_connections"
substring_start = len(connections_type) + 1
substring_end = substring_start + 36 # length of a UUID4
log = logging.getLogger(__name__)
def get_conn_name(datacat_type: str, oid: str):
return "{}/{}-connection".format(datacat_type, oid)
def get_normal_or_secret_property(key: str, props: Dict[str,str], secrets: Dict[str, str], default_value = None):
return props.get(key, secrets.get(key, default_value))
def get_connection(hook: DataCatalogHook, datacat_type: str, oid: str):
conn_id = get_conn_name(datacat_type, oid)
secrets_connection = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password)
datacat_entry: Dict[str,str] = secrets_connection.get_object(datacat_type, oid)['metadata']
datacat_entry_secrets = secrets_connection.get_all_secret_key_value(datacat_type, oid)
extra={}
predefined_keys = ['conn_type', 'description', 'host', 'login', 'password', 'schema', 'port']
# build extra from non-predefined keys
for key in datacat_entry:
if key not in predefined_keys:
extra[key] = datacat_entry[key]
for key in datacat_entry_secrets:
if key not in predefined_keys:
extra[key] = datacat_entry_secrets[key]
return Connection(
conn_id=conn_id,
conn_type=get_normal_or_secret_property('conn_type', datacat_entry, datacat_entry_secrets),
description=get_normal_or_secret_property('description', datacat_entry, datacat_entry_secrets, 'Automatically generated Connection from the datacatalog object {}/{}'.format(connections_type, oid)),
host=get_normal_or_secret_property('host', datacat_entry, datacat_entry_secrets),
login=get_normal_or_secret_property('login', datacat_entry, datacat_entry_secrets),
password=get_normal_or_secret_property('password', datacat_entry, datacat_entry_secrets),
schema=get_normal_or_secret_property('schema', datacat_entry, datacat_entry_secrets),
port=int(get_normal_or_secret_property('port', datacat_entry, datacat_entry_secrets)),
extra=extra
)
@dag(default_args=default_args, schedule_interval='@hourly', start_date=days_ago(1), tags=['dls-service-dag'])
def sync_connections():
@task
def list_catalog_connections(**kwargs):
hook = DataCatalogHook("datacatalog")
objects = hook.list_type(connections_type)
oid_list = [element[1] for element in objects]
return oid_list
@task
def list_airflow_connections(**kwargs):
session : SASession = settings.Session()
conns = session.query(Connection).filter(Connection.conn_id.like("{}/%-connection".format(connections_type)))
oid_list = [conn.conn_id[substring_start:substring_end] for conn in conns]
return oid_list
@task
def get_add_list(catalog_connections, airflow_connections, **kwargs):
return list(set(catalog_connections).difference(airflow_connections))
@task
def get_remove_list(catalog_connections, airflow_connections, **kwargs):
return list(set(airflow_connections).difference(catalog_connections))
@task
def remove_connections(oid_list, **kwargs):
log.info("Going to remove from conections: " + ','.join(oid_list))
session : SASession = settings.Session()
for oid in oid_list:
session.query(Connection).filter(Connection.conn_id == get_conn_name(connections_type, oid)).delete()
session.commit()
@task
def add_connections(oid_list, **kwargs):
log.info("Going to add to conections: " + ','.join(oid_list))
hook = DataCatalogHook("datacatalog")
connections = []
for oid in oid_list:
connections.append(get_connection(hook, connections_type, oid))
session = settings.Session()
# no check for existsnce necessary, since it is handled by get_add_list()
for conn in connections:
session.add(conn)
session.commit()
cat_conn = list_catalog_connections()
air_conn = list_airflow_connections()
add_list = get_add_list(cat_conn, air_conn)
remove_list = get_remove_list(cat_conn, air_conn)
add_connections(add_list)
remove_connections(remove_list)
dag = sync_connections()
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.hooks.base import BaseHook
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def test_secrets_backend():
@task()
def get_print_and_return_conenction():
conn = BaseHook.get_connection('860355e9-975f-4253-9421-1815e20c879b')
print(conn.get_extra())
get_print_and_return_conenction()
dag = test_secrets_backend()
......@@ -2,15 +2,16 @@
import os
import tempfile
from airflow import settings
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.dates import days_ago
from b2shareoperator import (add_file, create_draft_record,
get_record_template, submit_draft)
default_args = {
'owner': 'airflow',
}
......@@ -18,12 +19,40 @@ default_args = {
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def upload_example():
@task
def setup(**kwargs):
print(f"Setting up the connection")
params = kwargs['params']
rrid = kwargs['run_id']
host = params.get('host')
port = params.get('port', 2222)
user = params.get('login', 'eflows')
key = params.get('key')
conn_id = f"tmp_connection_{rrid}"
extra = {"private_key": key}
conn = Connection(
conn_id=conn_id,
conn_type='ssh',
description='Automatically generated Connection',
host=host,
login=user,
port=port,
extra=extra
)
session = settings.Session()
session.add(conn)
session.commit()
print(f"Connection {conn_id} created")
return conn_id
@task()
def load(**kwargs):
def load(connection_id, **kwargs):
params = kwargs['params']
target = params.get('target', '/tmp/')
source = params.get('source', '/tmp/')
connection_id = params.get('connection', 'default_ssh')
ssh_hook = SSHHook(ssh_conn_id=connection_id)
with ssh_hook.get_conn() as ssh_client:
......@@ -51,9 +80,29 @@ def upload_example():
# hate such hacks:
server = "https://" + connection.host
token = connection.extra_dejson['access_token']
print(f"Server: {server} + {token}")
template = get_record_template()
params = kwargs['params']
mid = params['mid']
hook = HttpHook(http_conn_id='datacat', method='GET')
hrespo = hook.run(endpoint=f"storage_target/{mid}").json()['metadata']
print(hrespo)
template = {
"titles" : [{"title":hrespo['title']}],
"creators" : [{"creator_name": hrespo['creator_name']}],
"descriptions" :[
{
"description": hrespo['description'],
"description_type": "Abstract"
}
],
"community": "a9217684-945b-4436-8632-cac271f894ed",
'community_specific':
{'91ae5d2a-3848-4693-9f7d-cbd141172ef0': {'helmholtz centre': ['Forschungszentrum Jülich']}},
"open_access": hrespo['open_access']=="True"
}
r = create_draft_record(server=server, token=token, record=template)
print(r)
print(f"Draft record created {r['id']} --> {r['links']['self']}")
......@@ -65,9 +114,22 @@ def upload_example():
print("Submitting record for pubication")
submitted = submit_draft(record=r, token=token)
print(f"Record created {submitted['id']}")
return submitted['id']
files = load()
upload(files)
@task()
def remove(conn_id, uid):
print(f"Upload {uid} completed. Removing conneciton {conn_id}")
session = settings.Session()
for con in session.query(Connection).all():
print(con)
session.query(Connection).filter(Connection.conn_id == conn_id).delete()
session.commit()
conn_id = setup()
files = load(connection_id=conn_id)
uid = upload(files)
remove(conn_id=conn_id, uid=uid)
dag = upload_example()
......@@ -56,6 +56,8 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__SECRETS__BACKEND_KWARGS: ${AIRFLOW__SECRETS__BACKEND_KWARGS}
AIRFLOW__SECRETS__BACKEND: ${AIRFLOW__SECRETS__BACKEND}
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
......
FROM apache/airflow:2.2.3
USER root
RUN apt update && apt install git -y
USER airflow
RUN pip --version
ADD requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
requests
urllib3==1.26.6
urllib3
plyvel
apache-airflow-providers-ssh
apache-airflow-providers-http
apache-airflow-providers-sftp
--index-url https://gitlab.jsc.fz-juelich.de/api/v4/projects/4405/packages/pypi/simple
airflow-datacat-integration>=0.1.1
This product is licensed to you under the Apache License, Version 2.0
(the "License"). You may not use this product except in compliance with
the License.
This product may include a number of subcomponents with separate
copyright notices and license terms. Your use of the source code for
these subcomponents is subject to the terms and conditions of the
subcomponent's license, as noted in the license.txt file.
The files in this folder have been modified by Forschungszentrum Juelich GmbH.
=======================================================================
BSD 2-Clause License
Copyright (c) 2021, Forschungszentrum Juelich GmbH
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment