From 4d7df92e145297c5ada37d79b104bbaf003ab998 Mon Sep 17 00:00:00 2001 From: Christian Boettcher <c.boettcher@fz-juelich.de> Date: Tue, 14 Dec 2021 09:03:31 +0100 Subject: [PATCH] move function from hooks to connection to prevent circular dependencies on import hooks --- src/datacat_integration/connection.py | 29 ++++++++++++++++++++++++++- src/datacat_integration/hooks.py | 29 +-------------------------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/datacat_integration/connection.py b/src/datacat_integration/connection.py index bf22979..fd78ab1 100644 --- a/src/datacat_integration/connection.py +++ b/src/datacat_integration/connection.py @@ -1,11 +1,38 @@ -from typing import Dict +from typing import Dict, Any import uuid import json from urllib.parse import urljoin +from airflow.models.connection import Connection + import requests + +def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str) -> Connection: + """returns an airflow connection from the data provided in the datacat entry.""" + conn_type = data['metadata']['conn_type'] + host = data['metadata']['host'] + port = data['metadata']['port'] + schema = data['metadata']['schema'] + conn_id = f"{datacat_type}/{oid}-connection" + # set all remaining metadata as extra + extra = {} + for key in data['metadata'].keys(): + if key in ['conn_type', 'host', 'port', 'schema']: + continue + extra[key] = data['metadata'][key] + + return Connection( + conn_id=conn_id, + conn_type=conn_type, + host=host, + port=port, + description="Automatically generated Connection for the given entry.", + schema=schema, + extra=extra + ) + class DataCatalogEntry: """A datatype representing an entry in the datacatalog.""" name: str = "" diff --git a/src/datacat_integration/hooks.py b/src/datacat_integration/hooks.py index 4559234..17bedd6 100644 --- a/src/datacat_integration/hooks.py +++ b/src/datacat_integration/hooks.py @@ -1,42 +1,15 @@ from typing import Any -from urllib.parse import urljoin from airflow.providers.http.hooks.http import HttpHook from airflow import settings from airflow.models.connection import Connection -from requests.sessions import session from requests import Session from typing import Optional, Dict -from datacat_integration.connection import DataCatConnection, DataCatalogEntry +from datacat_integration.connection import DataCatConnection, DataCatalogEntry, get_connection_from_entry from datacat_integration.auth import BearerAuth -def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str) -> Connection: - """returns an airflow connection from the data provided in the datacat entry.""" - conn_type = data['metadata']['conn_type'] - host = data['metadata']['host'] - port = data['metadata']['port'] - schema = data['metadata']['schema'] - conn_id = f"{datacat_type}/{oid}-connection" - # set all remaining metadata as extra - extra = {} - for key in data['metadata'].keys(): - if key in ['conn_type', 'host', 'port', 'schema']: - continue - extra[key] = data['metadata'][key] - - return Connection( - conn_id=conn_id, - conn_type=conn_type, - host=host, - port=port, - description="Automatically generated Connection for the given entry.", - schema=schema, - extra=extra - ) - - class DataCatalogHook(HttpHook): connection: DataCatConnection = None -- GitLab