diff --git a/datacat_integration/hooks.py b/datacat_integration/hooks.py new file mode 100644 index 0000000000000000000000000000000000000000..185f5cec401a8c054c64b754f01402ed8bbf238a --- /dev/null +++ b/datacat_integration/hooks.py @@ -0,0 +1,73 @@ +from typing import Any +from urllib.parse import urljoin +from airflow.providers.http.hooks.http import HttpHook +from airflow import settings +from airflow.models.connection import Connection +from requests.sessions import session +from requests import Session +from typing import Optional, Dict + +from connection import DataCatConnection, DataCatalogEntry + +from auth import BearerAuth + + +def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str) -> Connection: + """returns an aiflow connection from the data provided in the datacat entry.""" + conn_type = data['metadata']['conn_type'] + host = data['metadata']['host'] + port = data['metadata']['port'] + schema = data['metadata']['schema'] + conn_id = f"{datacat_type}/{oid}-connection" + # set all remaining metadata as extra + extra = {} + for key in data['metadata'].keys(): + if key in ['conn_type', 'host', 'port', 'schema']: + continue + extra[key] = data['metadata'][key] + + return Connection( + conn_id=conn_id, + conn_type=conn_type, + host=host, + port=port, + description="Automatically generated Connection for the given entry.", + schema=schema, + extra=extra + ) + + +class DataCatalogHook(HttpHook): + connection: DataCatConnection = None + + def __init__(self, http_conn_id: str = 'datacatalog') -> None: + super().__init__(method="GET", http_conn_id=http_conn_id) + conn = super().get_conn() + self.connection = DataCatConnection(catalog_url=self.base_url, username=conn.auth.username, password=conn.auth.password) + + def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> Session: + conn = super().get_conn(headers=headers) + self.connection = DataCatConnection(catalog_url=self.base_url, username=conn.auth.username, password=conn.auth.password) + conn.auth = BearerAuth(self.connection.get_token()) + return conn + + def get_object(self, datacat_type: str, oid: str): + return self.connection.get_object(datacat_type, oid) + + def create_object(self, datacat_type: str, object: DataCatalogEntry): + return self.connection.create_object(datacat_type, object) + + def list_type(self, datacat_type: str): + return self.connection.list_type(datacat_type) + + def create_get_object_connection(self, datacat_type: str, oid: str): + object = self.get_object(datacat_type=datacat_type, oid=oid) + # TODO decide some factors, such as connection type + conn = get_connection_from_entry(object, datacat_type, oid) + + session = settings.Session() + + conn_name = session.query(Connection).filter(Connection.conn_id == conn.conn_id).first() + if str(conn_name) != str (conn.conn_id): + session.add(conn) + session.commit() diff --git a/datacat_integration/operators.py b/datacat_integration/operators.py new file mode 100644 index 0000000000000000000000000000000000000000..4cb97640c74057c5840a34ca087842fba604b909 --- /dev/null +++ b/datacat_integration/operators.py @@ -0,0 +1,34 @@ +from airflow.models.baseoperator import BaseOperator +from hooks import DataCatalogHook + +class GetDatacatalogEntryOperator(BaseOperator): + """This task returns the data for an entry in the datacatalog.""" + def __init__( + self, + datacat_type: str, + oid : str, + **kwargs) -> None: + super().__init__(**kwargs) + self.datacat_type = datacat_type + self.oid = oid + + + def execute(self, context): + cat_conn = DataCatalogHook('datacatalog') + return cat_conn.get_object(self.datacat_type, self.oid) + +class GetDatacatalogEntryConnectionOperator(BaseOperator): + """This task returns the data for an entry in the datacatalog.""" + def __init__( + self, + datacat_type: str, + oid : str, + **kwargs) -> None: + super().__init__(**kwargs) + self.datacat_type = datacat_type + self.oid = oid + + + def execute(self, context): + hook = DataCatalogHook("datacatalog") + hook.create_get_object_connection(self.datacat_type, self.oid) \ No newline at end of file diff --git a/datacat_integration/secrets.py b/datacat_integration/secrets.py new file mode 100644 index 0000000000000000000000000000000000000000..d57f0b4a26b2e318ce8abee642ca740a9841d055 --- /dev/null +++ b/datacat_integration/secrets.py @@ -0,0 +1,89 @@ +from typing import Any, Dict, Set +from urllib.parse import urljoin +from airflow.secrets import BaseSecretsBackend +from airflow.models.connection import Connection +import requests +from connection import DataCatConnection + +from hooks import DataCatalogHook, get_connection_from_entry + +connection_backend_type = "airflow_connections" + + +def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[str, str] , datacat_type: str, oid: str) -> Connection: + """returns an aiflow connection from the data provided in the datacat entry and the secrets.""" + conn = get_connection_from_entry(data, datacat_type, oid) + conn.password = secrets['password'] + conn.login = secrets['login'] + # add all remaining secrets to extra + extra = conn.extra + for key in secrets.keys(): + if key in ['password', 'login']: + continue + extra[key] = secrets[key] + + conn.extra = extra + return conn + + +class DataCatConnectionWithSecrets(DataCatConnection): + def __init__(self, catalog_url: str = "", username: str = "", password: str = ""): + super().__init__(catalog_url=catalog_url, username=username, password=password) + + def get_secrets_keys(self, datacat_type: str, oid: str) -> Set: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + def get_all_secret_key_value(self, datacat_type: str, oid: str) -> Dict[str,str]: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets_values") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + def get_secret_value(self, datacat_type: str, oid: str, key: str) -> str: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets/{key}") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + +class DatacatSecretsBackend(BaseSecretsBackend): + """A Backend for directly managing connections in apache airflow""" + + # TODO init with datacat elevated login to prevent secrets acces from jobs + + def get_connection(self, conn_id: str): + """returns a Connection object created from the <conenction_type>/<conn_id> object in the datacatalog""" + # only for testing: check that a specific oid has been requested + if conn_id != "860355e9-975f-4253-9421-1815e20c879b": + return None + + + hook = DataCatalogHook("datacatalog") + secrets_conn = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password) + data = secrets_conn.get_object(connection_backend_type, conn_id) + secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) + conn = get_connection_from_entry(data, secrets, connection_backend_type, conn_id) + return conn \ No newline at end of file