Skip to content
Snippets Groups Projects
Commit 8746bbf5 authored by Christian Boettcher's avatar Christian Boettcher
Browse files

Merge branch 'datacat_integration' into 'main'

Datacat integration

See merge request !2
parents bbfd1749 b425904b
No related branches found
No related tags found
1 merge request!2Datacat integration
Pipeline #88480 passed
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
dockers/.env dockers/.env
.vscode/* .vscode/*
*.pyc *.pyc
logs/
.env
# contains data for local tests # contains data for local tests
.coverage .coverage
default: default:
image: python:3.9-slim image: python:3.9-slim
variables:
DOCKER_TLS_CERTDIR: ""
# before script copied from gitlab docs # before script copied from gitlab docs
before_script: before_script:
- 'command -v ssh-agent >/dev/null || ( apt-get update -y && apt-get install openssh-client gcc libxslt-dev libffi-dev libssl-dev build-essential python3-dev -y )' - 'command -v ssh-agent >/dev/null || ( apt-get update -y && apt-get install openssh-client gcc libxslt-dev libffi-dev libssl-dev build-essential python3-dev -y )'
...@@ -17,6 +21,8 @@ test: ...@@ -17,6 +21,8 @@ test:
name: apache/airflow:2.1.3 name: apache/airflow:2.1.3
entrypoint: [""] entrypoint: [""]
before_script: before_script:
- echo "DEBUG:"
- pip --version
- airflow db init - airflow db init
- pip install -r requirements.txt - pip install -r requirements.txt
- pip install nose==1.3.7 - pip install nose==1.3.7
...@@ -30,6 +36,26 @@ test: ...@@ -30,6 +36,26 @@ test:
- airflow dags test testdag 2021-08-18 - airflow dags test testdag 2021-08-18
- nosetests - nosetests
build-custom-image:
stage: build
image: docker:latest
services:
- docker:dind
when: manual
tags:
- laptop
variables:
IMAGE_COMMIT_TAG: $CI_REGISTRY_IMAGE/eflows-airflow:$CI_COMMIT_SHORT_SHA
IMAGE_LATEST_TAG: $CI_REGISTRY_IMAGE/eflows-airflow:latest
script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker build --no-cache=true --pull -t $IMAGE_COMMIT_TAG -f dockers/eflows-airflow.docker .
- docker push $IMAGE_COMMIT_TAG
- docker tag $IMAGE_COMMIT_TAG $IMAGE_LATEST_TAG
- docker push $IMAGE_LATEST_TAG
deploy-test: deploy-test:
stage: deploy stage: deploy
environment: Testing environment: Testing
...@@ -40,6 +66,8 @@ deploy-test: ...@@ -40,6 +66,8 @@ deploy-test:
OS_IDENTITY_API_VERSION: 3 OS_IDENTITY_API_VERSION: 3
OS_REGION_NAME: "HDFCloud" OS_REGION_NAME: "HDFCloud"
OS_INTERFACE: public OS_INTERFACE: public
AIRFLOW__SECRETS__BACKEND_KWARGS: '{"url" : "https://zam10036.zam.kfa-juelich.de", "user" : "${DATACAT_TESTING_USERNAME}", "password" : "${DATACAT_TESTING_PASSWORD}"}'
AIRFLOW__SECRETS__BACKEND: datacat_integration.secrets.DatacatSecretsBackend
FLOATING_IP: 134.94.199.220 FLOATING_IP: 134.94.199.220
script: script:
- echo "Starting the full testing deployment of airflows example." - echo "Starting the full testing deployment of airflows example."
......
from typing import Dict
from airflow.decorators import dag, task
from airflow.models.connection import Connection
from airflow.utils.dates import days_ago
from airflow import settings
import logging
from sqlalchemy.orm.session import Session as SASession
from datacat_integration.secrets import DataCatConnectionWithSecrets
from datacat_integration.hooks import DataCatalogHook
default_args = {
'owner': 'airflow',
}
connections_type = "airflow_connections"
substring_start = len(connections_type) + 1
substring_end = substring_start + 36 # length of a UUID4
log = logging.getLogger(__name__)
def get_conn_name(datacat_type: str, oid: str):
return "{}/{}-connection".format(datacat_type, oid)
def get_normal_or_secret_property(key: str, props: Dict[str,str], secrets: Dict[str, str], default_value = None):
return props.get(key, secrets.get(key, default_value))
def get_connection(hook: DataCatalogHook, datacat_type: str, oid: str):
conn_id = get_conn_name(datacat_type, oid)
secrets_connection = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password)
datacat_entry: Dict[str,str] = secrets_connection.get_object(datacat_type, oid)['metadata']
datacat_entry_secrets = secrets_connection.get_all_secret_key_value(datacat_type, oid)
extra={}
predefined_keys = ['conn_type', 'description', 'host', 'login', 'password', 'schema', 'port']
# build extra from non-predefined keys
for key in datacat_entry:
if key not in predefined_keys:
extra[key] = datacat_entry[key]
for key in datacat_entry_secrets:
if key not in predefined_keys:
extra[key] = datacat_entry_secrets[key]
return Connection(
conn_id=conn_id,
conn_type=get_normal_or_secret_property('conn_type', datacat_entry, datacat_entry_secrets),
description=get_normal_or_secret_property('description', datacat_entry, datacat_entry_secrets, 'Automatically generated Connection from the datacatalog object {}/{}'.format(connections_type, oid)),
host=get_normal_or_secret_property('host', datacat_entry, datacat_entry_secrets),
login=get_normal_or_secret_property('login', datacat_entry, datacat_entry_secrets),
password=get_normal_or_secret_property('password', datacat_entry, datacat_entry_secrets),
schema=get_normal_or_secret_property('schema', datacat_entry, datacat_entry_secrets),
port=int(get_normal_or_secret_property('port', datacat_entry, datacat_entry_secrets)),
extra=extra
)
@dag(default_args=default_args, schedule_interval='@hourly', start_date=days_ago(1), tags=['dls-service-dag'])
def sync_connections():
@task
def list_catalog_connections(**kwargs):
hook = DataCatalogHook("datacatalog")
objects = hook.list_type(connections_type)
oid_list = [element[1] for element in objects]
return oid_list
@task
def list_airflow_connections(**kwargs):
session : SASession = settings.Session()
conns = session.query(Connection).filter(Connection.conn_id.like("{}/%-connection".format(connections_type)))
oid_list = [conn.conn_id[substring_start:substring_end] for conn in conns]
return oid_list
@task
def get_add_list(catalog_connections, airflow_connections, **kwargs):
return list(set(catalog_connections).difference(airflow_connections))
@task
def get_remove_list(catalog_connections, airflow_connections, **kwargs):
return list(set(airflow_connections).difference(catalog_connections))
@task
def remove_connections(oid_list, **kwargs):
log.info("Going to remove from conections: " + ','.join(oid_list))
session : SASession = settings.Session()
for oid in oid_list:
session.query(Connection).filter(Connection.conn_id == get_conn_name(connections_type, oid)).delete()
session.commit()
@task
def add_connections(oid_list, **kwargs):
log.info("Going to add to conections: " + ','.join(oid_list))
hook = DataCatalogHook("datacatalog")
connections = []
for oid in oid_list:
connections.append(get_connection(hook, connections_type, oid))
session = settings.Session()
# no check for existsnce necessary, since it is handled by get_add_list()
for conn in connections:
session.add(conn)
session.commit()
cat_conn = list_catalog_connections()
air_conn = list_airflow_connections()
add_list = get_add_list(cat_conn, air_conn)
remove_list = get_remove_list(cat_conn, air_conn)
add_connections(add_list)
remove_connections(remove_list)
dag = sync_connections()
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.hooks.base import BaseHook
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def test_secrets_backend():
@task()
def get_print_and_return_conenction():
conn = BaseHook.get_connection('860355e9-975f-4253-9421-1815e20c879b')
print(conn.get_extra())
get_print_and_return_conenction()
dag = test_secrets_backend()
...@@ -56,6 +56,8 @@ x-airflow-common: ...@@ -56,6 +56,8 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__SECRETS__BACKEND_KWARGS: ${AIRFLOW__SECRETS__BACKEND_KWARGS}
AIRFLOW__SECRETS__BACKEND: ${AIRFLOW__SECRETS__BACKEND}
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes: volumes:
- ./dags:/opt/airflow/dags - ./dags:/opt/airflow/dags
......
FROM apache/airflow:2.2.3
USER root
RUN apt update && apt install git -y
USER airflow
RUN pip --version
ADD requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment