From 27e81d6d3d583c4bd8d8fd924eae6a485c557e72 Mon Sep 17 00:00:00 2001 From: Jedrzej Rybicki <j.rybicki@fz-juelich.de> Date: Mon, 13 Dec 2021 16:19:23 +0100 Subject: [PATCH] Update secrets.py --- src/datacat_integration/secrets.py | 165 +++++++++++++++++------------ 1 file changed, 97 insertions(+), 68 deletions(-) diff --git a/src/datacat_integration/secrets.py b/src/datacat_integration/secrets.py index 33d18a3..2bf57de 100644 --- a/src/datacat_integration/secrets.py +++ b/src/datacat_integration/secrets.py @@ -1,19 +1,45 @@ from typing import Any, Dict, Set from urllib.parse import urljoin from airflow.secrets import BaseSecretsBackend -from airflow.models.connection import Connection +#from airflow.models.connection import Connection import requests import logging from datacat_integration.connection import DataCatConnection -from datacat_integration.hooks import get_connection_from_entry +#from datacat_integration.hooks import get_connection_from_entry connection_backend_type = "airflow_connections" log = logging.getLogger(__name__) +def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str): + """returns an aiflow connection from the data provided in the datacat entry.""" + conn_type = data['metadata']['conn_type'] + host = data['metadata']['host'] + port = data['metadata']['port'] + schema = data['metadata']['schema'] + conn_id = f"{datacat_type}/{oid}-connection" + # set all remaining metadata as extra + extra = {} + for key in data['metadata'].keys(): + if key in ['conn_type', 'host', 'port', 'schema']: + continue + extra[key] = data['metadata'][key] + + return dict() + #Connection( + # conn_id=conn_id, + # conn_type=conn_type, + # host=host, + # port=port, + # description="Automatically generated Connection for the given entry.", + # schema=schema, + # extra=extra + #) + -def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[str, str] , datacat_type: str, oid: str) -> Connection: + +def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[str, str] , datacat_type: str, oid: str): """returns an aiflow connection from the data provided in the datacat entry and the secrets.""" conn = get_connection_from_entry(data, datacat_type, oid) conn.password = secrets['password'] @@ -28,69 +54,72 @@ def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[s conn.extra = extra return conn + class DataCatConnectionWithSecrets(DataCatConnection): + def __init__(self, catalog_url: str = "", username: str = "", password: str = ""): + super().__init__(catalog_url=catalog_url, username=username, password=password) + + def get_secrets_keys(self, datacat_type: str, oid: str) -> Set: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + def get_all_secret_key_value(self, datacat_type: str, oid: str) -> Dict[str,str]: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets_values") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + def get_secret_value(self, datacat_type: str, oid: str, key: str) -> str: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets/{key}") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + class DatacatSecretsBackend(BaseSecretsBackend): + """A Backend for directly managing connections in apache airflow""" + + def __init__(self, **kwargs): + log.debug("Init of Datacat Secrets Backend") + super().__init__(**kwargs) + self.url = kwargs["url"] + self.user = kwargs["user"] + self.password = kwargs["password"] + + def get_connection(self, conn_id: str): + """returns a Connection object created from the <conenction_type>/<conn_id> object in the datacatalog""" + # only for testing: check that a specific oid has been requested + log.debug(f"Get connection: {conn_id}") + if conn_id != "860355e9-975f-4253-9421-1815e20c879b": + return None + + secrets_conn = DataCatConnectionWithSecrets(self.url, self.user, self.password) + data = secrets_conn.get_object(connection_backend_type, conn_id) + secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) + data = secrets_conn.get_object(connection_backend_type, conn_id) + secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) + conn = get_connection_from_entry(data, secrets, connection_backend_type, conn_id) + return conn -class DataCatConnectionWithSecrets(DataCatConnection): - def __init__(self, catalog_url: str = "", username: str = "", password: str = ""): - super().__init__(catalog_url=catalog_url, username=username, password=password) - - def get_secrets_keys(self, datacat_type: str, oid: str) -> Set: - headers = { - 'accept' : 'application/json', - 'Content-Type' : 'application/json', - 'Authorization' : 'Bearer {}'.format(self._auth_token) - } - url = urljoin(self.url, f"{datacat_type}/{oid}/secrets") - response = requests.get(url, headers=headers) - if response.ok: - return response.json() - else: - raise ConnectionError(response.text) - - def get_all_secret_key_value(self, datacat_type: str, oid: str) -> Dict[str,str]: - headers = { - 'accept' : 'application/json', - 'Content-Type' : 'application/json', - 'Authorization' : 'Bearer {}'.format(self._auth_token) - } - url = urljoin(self.url, f"{datacat_type}/{oid}/secrets_values") - response = requests.get(url, headers=headers) - if response.ok: - return response.json() - else: - raise ConnectionError(response.text) - - def get_secret_value(self, datacat_type: str, oid: str, key: str) -> str: - headers = { - 'accept' : 'application/json', - 'Content-Type' : 'application/json', - 'Authorization' : 'Bearer {}'.format(self._auth_token) - } - url = urljoin(self.url, f"{datacat_type}/{oid}/secrets/{key}") - response = requests.get(url, headers=headers) - if response.ok: - return response.json() - else: - raise ConnectionError(response.text) - -class DatacatSecretsBackend(BaseSecretsBackend): - """A Backend for directly managing connections in apache airflow""" - - def __init__(self, **kwargs): - log.debug("Init of Datacat Secrets Backend") - super().__init__(**kwargs) - self.url = kwargs["url"] - self.user = kwargs["user"] - self.password = kwargs["password"] - - def get_connection(self, conn_id: str): - """returns a Connection object created from the <conenction_type>/<conn_id> object in the datacatalog""" - # only for testing: check that a specific oid has been requested - log.debug(f"Get connection: {conn_id}") - if conn_id != "860355e9-975f-4253-9421-1815e20c879b": - return None - - secrets_conn = DataCatConnectionWithSecrets(self.url, self.user, self.password) - data = secrets_conn.get_object(connection_backend_type, conn_id) - secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) - conn = get_connection_from_entry(data, secrets, connection_backend_type, conn_id) - return conn \ No newline at end of file + -- GitLab