Skip to content
Snippets Groups Projects
Commit 71bae68a authored by Christian Boettcher's avatar Christian Boettcher
Browse files

add tests for getconnectionfromentry function

parent ef632c33
No related branches found
No related tags found
No related merge requests found
Pipeline #86565 passed
......@@ -11,17 +11,20 @@ def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str)
"""returns an airflow connection from the data provided in the datacat entry."""
# delay import to prevent circular dependencies during config startup
from airflow.models.connection import Connection
conn_type = data['metadata'].get('conn_type')
host = data['metadata'].get('host')
port = data['metadata'].get('port')
schema = data['metadata'].get('schema')
metadata = data['metadata']
if isinstance(metadata, str):
raise TypeError("Datacat entry metadata is a string instead of a dict.")
conn_type = metadata.get('conn_type')
host = metadata.get('host')
port = metadata.get('port')
schema = metadata.get('schema')
conn_id = f"{datacat_type}/{oid}-connection"
# set all remaining metadata as extra
extra = {}
for key in data['metadata'].keys():
for key in metadata.keys():
if key in ['conn_type', 'host', 'port', 'schema']:
continue
extra[key] = data['metadata'][key]
extra[key] = metadata[key]
return Connection(
conn_id=conn_id,
......
......@@ -11,9 +11,6 @@ connection_backend_type = "airflow_connections"
log = logging.getLogger(__name__)
class Empty:
pass
def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[str, str] , datacat_type: str, oid: str):
"""returns an aiflow connection from the data provided in the datacat entry and the secrets."""
conn = get_connection_from_entry(data, datacat_type, oid)
......@@ -88,7 +85,7 @@ class DatacatSecretsBackend(BaseSecretsBackend):
log.debug(f"Get connection: {conn_id}")
secrets_conn = DataCatConnectionWithSecrets(self.url, self.user, self.password)
data: Dict[str,str] = DataCatalogEntry.from_json(secrets_conn.get_entry(connection_backend_type, conn_id)).__dict__
data: Dict[str,str] = json.loads(secrets_conn.get_entry(connection_backend_type, conn_id))
secrets: Dict[str,str] = json.loads(secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id))
conn = get_connection_with_secrets_from_entry(data, secrets, connection_backend_type, conn_id)
return conn
\ No newline at end of file
from unittest import TestCase
import os, random
import os, random, json
from dotenv import load_dotenv
from datacat_integration.connection import DataCatalogEntry, DataCatConnection
from datacat_integration.connection import DataCatalogEntry, DataCatConnection, get_connection_from_entry
class GetConnectionTest(TestCase):
def setUp(self):
pass
def test_get_connection(self):
data = {"name" : "foo", "url" : "bar", "metadata" : {"a" : "b", "c" : "d", "conn_type" : "http", "port" : "443", "host" : "test.com", "schema" : "https"}}
datacat_type = "airflow_connections"
oid = "860355e9-975f-4253-9421-1815e20c879b"
conn = get_connection_from_entry(data, datacat_type, oid)
self.assertEqual(conn.conn_id, f"{datacat_type}/{oid}-connection")
self.assertEqual(conn.conn_type, "http")
self.assertEqual(str(conn.port), "443")
self.assertEqual(json.loads(conn.get_extra())["a"], "b")
def test_get_wrong_metadata(self):
# metadata is a json string instead of a dict
data = {"name" : "foo", "url" : "bar", "metadata" : '{"a" : "b", "c" : "d", "conn_type" : "http", "port" : "443", "host" : "test.com", "schema" : "https"}'}
datacat_type = "airflow_connections"
oid = "860355e9-975f-4253-9421-1815e20c879b"
self.assertRaises(TypeError, get_connection_from_entry, data, datacat_type, oid)
class EntryTest(TestCase):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment