Skip to content
Snippets Groups Projects
Commit 4d7df92e authored by Christian Boettcher's avatar Christian Boettcher
Browse files

move function from hooks to connection to prevent circular dependencies on import hooks

parent 3b640fde
Branches
Tags
No related merge requests found
from typing import Dict
from typing import Dict, Any
import uuid
import json
from urllib.parse import urljoin
from airflow.models.connection import Connection
import requests
def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str) -> Connection:
"""returns an airflow connection from the data provided in the datacat entry."""
conn_type = data['metadata']['conn_type']
host = data['metadata']['host']
port = data['metadata']['port']
schema = data['metadata']['schema']
conn_id = f"{datacat_type}/{oid}-connection"
# set all remaining metadata as extra
extra = {}
for key in data['metadata'].keys():
if key in ['conn_type', 'host', 'port', 'schema']:
continue
extra[key] = data['metadata'][key]
return Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
port=port,
description="Automatically generated Connection for the given entry.",
schema=schema,
extra=extra
)
class DataCatalogEntry:
"""A datatype representing an entry in the datacatalog."""
name: str = ""
......
from typing import Any
from urllib.parse import urljoin
from airflow.providers.http.hooks.http import HttpHook
from airflow import settings
from airflow.models.connection import Connection
from requests.sessions import session
from requests import Session
from typing import Optional, Dict
from datacat_integration.connection import DataCatConnection, DataCatalogEntry
from datacat_integration.connection import DataCatConnection, DataCatalogEntry, get_connection_from_entry
from datacat_integration.auth import BearerAuth
def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str) -> Connection:
"""returns an airflow connection from the data provided in the datacat entry."""
conn_type = data['metadata']['conn_type']
host = data['metadata']['host']
port = data['metadata']['port']
schema = data['metadata']['schema']
conn_id = f"{datacat_type}/{oid}-connection"
# set all remaining metadata as extra
extra = {}
for key in data['metadata'].keys():
if key in ['conn_type', 'host', 'port', 'schema']:
continue
extra[key] = data['metadata'][key]
return Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
port=port,
description="Automatically generated Connection for the given entry.",
schema=schema,
extra=extra
)
class DataCatalogHook(HttpHook):
connection: DataCatConnection = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment