diff --git a/src/datacat_integration/secrets.py b/src/datacat_integration/secrets.py index 2bf57de24cf0f2328a7fcc9d798880a9de1e0980..3ce514c4a71646d91da83d74865106a87694f17d 100644 --- a/src/datacat_integration/secrets.py +++ b/src/datacat_integration/secrets.py @@ -54,72 +54,72 @@ def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[s conn.extra = extra return conn - class DataCatConnectionWithSecrets(DataCatConnection): - def __init__(self, catalog_url: str = "", username: str = "", password: str = ""): - super().__init__(catalog_url=catalog_url, username=username, password=password) - - def get_secrets_keys(self, datacat_type: str, oid: str) -> Set: - headers = { - 'accept' : 'application/json', - 'Content-Type' : 'application/json', - 'Authorization' : 'Bearer {}'.format(self._auth_token) - } - url = urljoin(self.url, f"{datacat_type}/{oid}/secrets") - response = requests.get(url, headers=headers) - if response.ok: - return response.json() - else: - raise ConnectionError(response.text) - - def get_all_secret_key_value(self, datacat_type: str, oid: str) -> Dict[str,str]: - headers = { - 'accept' : 'application/json', - 'Content-Type' : 'application/json', - 'Authorization' : 'Bearer {}'.format(self._auth_token) - } - url = urljoin(self.url, f"{datacat_type}/{oid}/secrets_values") - response = requests.get(url, headers=headers) - if response.ok: - return response.json() - else: - raise ConnectionError(response.text) - - def get_secret_value(self, datacat_type: str, oid: str, key: str) -> str: - headers = { - 'accept' : 'application/json', - 'Content-Type' : 'application/json', - 'Authorization' : 'Bearer {}'.format(self._auth_token) - } - url = urljoin(self.url, f"{datacat_type}/{oid}/secrets/{key}") - response = requests.get(url, headers=headers) - if response.ok: - return response.json() - else: - raise ConnectionError(response.text) - - class DatacatSecretsBackend(BaseSecretsBackend): - """A Backend for directly managing connections in apache airflow""" - - def __init__(self, **kwargs): - log.debug("Init of Datacat Secrets Backend") - super().__init__(**kwargs) - self.url = kwargs["url"] - self.user = kwargs["user"] - self.password = kwargs["password"] - - def get_connection(self, conn_id: str): - """returns a Connection object created from the <conenction_type>/<conn_id> object in the datacatalog""" - # only for testing: check that a specific oid has been requested - log.debug(f"Get connection: {conn_id}") - if conn_id != "860355e9-975f-4253-9421-1815e20c879b": - return None - - secrets_conn = DataCatConnectionWithSecrets(self.url, self.user, self.password) - data = secrets_conn.get_object(connection_backend_type, conn_id) - secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) - data = secrets_conn.get_object(connection_backend_type, conn_id) - secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) - conn = get_connection_from_entry(data, secrets, connection_backend_type, conn_id) - return conn - - +class DataCatConnectionWithSecrets(DataCatConnection): + def __init__(self, catalog_url: str = "", username: str = "", password: str = ""): + super().__init__(catalog_url=catalog_url, username=username, password=password) + + def get_secrets_keys(self, datacat_type: str, oid: str) -> Set: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + def get_all_secret_key_value(self, datacat_type: str, oid: str) -> Dict[str,str]: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets_values") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + + def get_secret_value(self, datacat_type: str, oid: str, key: str) -> str: + headers = { + 'accept' : 'application/json', + 'Content-Type' : 'application/json', + 'Authorization' : 'Bearer {}'.format(self._auth_token) + } + url = urljoin(self.url, f"{datacat_type}/{oid}/secrets/{key}") + response = requests.get(url, headers=headers) + if response.ok: + return response.json() + else: + raise ConnectionError(response.text) + +class DatacatSecretsBackend(BaseSecretsBackend): + """A Backend for directly managing connections in apache airflow""" + + def __init__(self, **kwargs): + log.debug("Init of Datacat Secrets Backend") + super().__init__(**kwargs) + self.url = kwargs["url"] + self.user = kwargs["user"] + self.password = kwargs["password"] + + def get_connection(self, conn_id: str): + """returns a Connection object created from the <conenction_type>/<conn_id> object in the datacatalog""" + # only for testing: check that a specific oid has been requested + log.debug(f"Get connection: {conn_id}") + if conn_id != "860355e9-975f-4253-9421-1815e20c879b": + return None + + secrets_conn = DataCatConnectionWithSecrets(self.url, self.user, self.password) + data = secrets_conn.get_object(connection_backend_type, conn_id) + secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) + data = secrets_conn.get_object(connection_backend_type, conn_id) + secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id) + conn = get_connection_from_entry(data, secrets, connection_backend_type, conn_id) + return conn + +