Skip to content
Snippets Groups Projects
Commit 0d8b264b authored by Christian Boettcher's avatar Christian Boettcher
Browse files

add hooks, operators and secret backend

parent e672af7c
No related branches found
No related tags found
No related merge requests found
Pipeline #85558 passed
from typing import Any
from urllib.parse import urljoin
from airflow.providers.http.hooks.http import HttpHook
from airflow import settings
from airflow.models.connection import Connection
from requests.sessions import session
from requests import Session
from typing import Optional, Dict
from connection import DataCatConnection, DataCatalogEntry
from auth import BearerAuth
def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str) -> Connection:
"""returns an aiflow connection from the data provided in the datacat entry."""
conn_type = data['metadata']['conn_type']
host = data['metadata']['host']
port = data['metadata']['port']
schema = data['metadata']['schema']
conn_id = f"{datacat_type}/{oid}-connection"
# set all remaining metadata as extra
extra = {}
for key in data['metadata'].keys():
if key in ['conn_type', 'host', 'port', 'schema']:
continue
extra[key] = data['metadata'][key]
return Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
port=port,
description="Automatically generated Connection for the given entry.",
schema=schema,
extra=extra
)
class DataCatalogHook(HttpHook):
connection: DataCatConnection = None
def __init__(self, http_conn_id: str = 'datacatalog') -> None:
super().__init__(method="GET", http_conn_id=http_conn_id)
conn = super().get_conn()
self.connection = DataCatConnection(catalog_url=self.base_url, username=conn.auth.username, password=conn.auth.password)
def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> Session:
conn = super().get_conn(headers=headers)
self.connection = DataCatConnection(catalog_url=self.base_url, username=conn.auth.username, password=conn.auth.password)
conn.auth = BearerAuth(self.connection.get_token())
return conn
def get_object(self, datacat_type: str, oid: str):
return self.connection.get_object(datacat_type, oid)
def create_object(self, datacat_type: str, object: DataCatalogEntry):
return self.connection.create_object(datacat_type, object)
def list_type(self, datacat_type: str):
return self.connection.list_type(datacat_type)
def create_get_object_connection(self, datacat_type: str, oid: str):
object = self.get_object(datacat_type=datacat_type, oid=oid)
# TODO decide some factors, such as connection type
conn = get_connection_from_entry(object, datacat_type, oid)
session = settings.Session()
conn_name = session.query(Connection).filter(Connection.conn_id == conn.conn_id).first()
if str(conn_name) != str (conn.conn_id):
session.add(conn)
session.commit()
from airflow.models.baseoperator import BaseOperator
from hooks import DataCatalogHook
class GetDatacatalogEntryOperator(BaseOperator):
"""This task returns the data for an entry in the datacatalog."""
def __init__(
self,
datacat_type: str,
oid : str,
**kwargs) -> None:
super().__init__(**kwargs)
self.datacat_type = datacat_type
self.oid = oid
def execute(self, context):
cat_conn = DataCatalogHook('datacatalog')
return cat_conn.get_object(self.datacat_type, self.oid)
class GetDatacatalogEntryConnectionOperator(BaseOperator):
"""This task returns the data for an entry in the datacatalog."""
def __init__(
self,
datacat_type: str,
oid : str,
**kwargs) -> None:
super().__init__(**kwargs)
self.datacat_type = datacat_type
self.oid = oid
def execute(self, context):
hook = DataCatalogHook("datacatalog")
hook.create_get_object_connection(self.datacat_type, self.oid)
\ No newline at end of file
from typing import Any, Dict, Set
from urllib.parse import urljoin
from airflow.secrets import BaseSecretsBackend
from airflow.models.connection import Connection
import requests
from connection import DataCatConnection
from hooks import DataCatalogHook, get_connection_from_entry
connection_backend_type = "airflow_connections"
def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[str, str] , datacat_type: str, oid: str) -> Connection:
"""returns an aiflow connection from the data provided in the datacat entry and the secrets."""
conn = get_connection_from_entry(data, datacat_type, oid)
conn.password = secrets['password']
conn.login = secrets['login']
# add all remaining secrets to extra
extra = conn.extra
for key in secrets.keys():
if key in ['password', 'login']:
continue
extra[key] = secrets[key]
conn.extra = extra
return conn
class DataCatConnectionWithSecrets(DataCatConnection):
def __init__(self, catalog_url: str = "", username: str = "", password: str = ""):
super().__init__(catalog_url=catalog_url, username=username, password=password)
def get_secrets_keys(self, datacat_type: str, oid: str) -> Set:
headers = {
'accept' : 'application/json',
'Content-Type' : 'application/json',
'Authorization' : 'Bearer {}'.format(self._auth_token)
}
url = urljoin(self.url, f"{datacat_type}/{oid}/secrets")
response = requests.get(url, headers=headers)
if response.ok:
return response.json()
else:
raise ConnectionError(response.text)
def get_all_secret_key_value(self, datacat_type: str, oid: str) -> Dict[str,str]:
headers = {
'accept' : 'application/json',
'Content-Type' : 'application/json',
'Authorization' : 'Bearer {}'.format(self._auth_token)
}
url = urljoin(self.url, f"{datacat_type}/{oid}/secrets_values")
response = requests.get(url, headers=headers)
if response.ok:
return response.json()
else:
raise ConnectionError(response.text)
def get_secret_value(self, datacat_type: str, oid: str, key: str) -> str:
headers = {
'accept' : 'application/json',
'Content-Type' : 'application/json',
'Authorization' : 'Bearer {}'.format(self._auth_token)
}
url = urljoin(self.url, f"{datacat_type}/{oid}/secrets/{key}")
response = requests.get(url, headers=headers)
if response.ok:
return response.json()
else:
raise ConnectionError(response.text)
class DatacatSecretsBackend(BaseSecretsBackend):
"""A Backend for directly managing connections in apache airflow"""
# TODO init with datacat elevated login to prevent secrets acces from jobs
def get_connection(self, conn_id: str):
"""returns a Connection object created from the <conenction_type>/<conn_id> object in the datacatalog"""
# only for testing: check that a specific oid has been requested
if conn_id != "860355e9-975f-4253-9421-1815e20c879b":
return None
hook = DataCatalogHook("datacatalog")
secrets_conn = DataCatConnectionWithSecrets(hook.connection.url, hook.connection.user, hook.connection._password)
data = secrets_conn.get_object(connection_backend_type, conn_id)
secrets = secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id)
conn = get_connection_from_entry(data, secrets, connection_backend_type, conn_id)
return conn
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment