diff --git a/requirements.txt b/requirements.txt
index 91a26ee722ac3df843c8acd97bad653cbec8e3e3..8743ff01b00bfecd4c2e9aeedace366d444260f8 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,6 +7,7 @@ urllib3>=1.26.6
 apache-airflow-providers-ssh
 apache-airflow-providers-http
 apache-airflow-providers-sftp
+apache-airflow-providers-hashicorp
 python-dotenv
 --index-url https://gitlab.jsc.fz-juelich.de/api/v4/projects/4405/packages/pypi/simple
 airflow-datacat-integration
\ No newline at end of file
diff --git a/src/datacat_integration/connection.py b/src/datacat_integration/connection.py
index f936305af1120f30a2b401a1dd1c6b3542581eb8..f390eabe03540da0d86311a1ebf01a426b2e326b 100644
--- a/src/datacat_integration/connection.py
+++ b/src/datacat_integration/connection.py
@@ -22,7 +22,7 @@ def get_connection_from_entry(data: Dict[str, Any], datacat_type: str, oid: str)
     # set all remaining metadata as extra
     extra = {}
     for key in metadata.keys():
-        if key in ['conn_type', 'host', 'port', 'schema']:
+        if key in ['conn_type', 'host', 'port', 'schema', 'conn_oid']:
             continue
         extra[key] = metadata[key]
 
diff --git a/src/datacat_integration/secrets.py b/src/datacat_integration/secrets.py
index 065ae5e3c617625db64318602dd10c81e160a735..a5ceec38764251f86163cf99291c0a8ce1c3264e 100644
--- a/src/datacat_integration/secrets.py
+++ b/src/datacat_integration/secrets.py
@@ -8,7 +8,7 @@ import uuid
 
 from datacat_integration.connection import DataCatConnection, get_connection_from_entry
 
-connection_backend_type = "airflow_connections"
+default_connection_backend_type = "airflow_connections"
 
 log = logging.getLogger(__name__)
 
@@ -21,7 +21,7 @@ def validate_uuid(uuid_to_test: str):
     return str(uuid_obj) == uuid_to_test
 
 def get_connection_with_secrets_from_entry(data: Dict[str, Any], secrets: Dict[str, str] , datacat_type: str, oid: str):
-    """returns an aiflow connection from the data provided in the datacat entry and the secrets."""
+    """returns an airflow connection from the data provided in the datacat entry and the secrets."""
     conn = get_connection_from_entry(data, datacat_type, oid)
     conn.password = secrets.get('password')
     conn.login = secrets.get('login')
@@ -96,11 +96,34 @@ class DatacatSecretsBackend(BaseSecretsBackend):
         
         log.debug(f"Get connection from datacat secrets backend: {conn_id}")
 
-        if not validate_uuid(conn_id):
+        # break up conn_id into connection_type and uuid and update documentation for that
+        # if no connection_type is given, assume default connection_type
+        split_arr = conn_id.split('/')
+
+        if len(split_arr) == 1:
+            connection_backend_type = default_connection_backend_type
+            base_conn_oid = split_arr[0]
+        elif len(split_arr) == 2:
+            connection_backend_type = split_arr[0]
+            base_conn_oid = split_arr[1]
+        else:
+            return None # connection with multiple "/" in the id is not possible
+
+        if not validate_uuid(base_conn_oid):
             return None # can not be a connection in the datacat, immediatly return None to allow airflow to look elsewhere
 
         secrets_conn = DataCatConnectionWithSecrets(self.url, self.user, self.password)
-        data: Dict[str,str] = json.loads(secrets_conn.get_entry(connection_backend_type, conn_id))
-        secrets: Dict[str,str] = json.loads(secrets_conn.get_all_secret_key_value(connection_backend_type, conn_id))
-        conn = get_connection_with_secrets_from_entry(data, secrets, connection_backend_type, conn_id)
+        data: Dict[str,Any] = json.loads(secrets_conn.get_entry(connection_backend_type, base_conn_oid))
+        secrets: Dict[str,str] = json.loads(secrets_conn.get_all_secret_key_value(connection_backend_type, base_conn_oid))
+
+        linked_conn_oid = data['metadata'].get('conn_oid', None)
+        if linked_conn_oid is not None:
+            # get metadata from linked connection as conn_md
+            linked_connection_data = json.loads(secrets_conn.get_entry('airflow_connections', linked_conn_oid))
+            # merge conn_md and data (data wins conflicts with conn_md)
+            linked_connection_data['metadata'].update(data['metadata'])
+            data['metadata'] = linked_connection_data['metadata']
+        
+
+        conn = get_connection_with_secrets_from_entry(data, secrets, connection_backend_type, base_conn_oid)
         return conn
\ No newline at end of file
diff --git a/tests/test_connection.py b/tests/test_connection.py
index c1fd15ec8ef47677e7ada660818cfa036e4f38f6..ce3e8c6d9c53a33c74ccda18c72670fd6ea093f5 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -13,7 +13,7 @@ class GetConnectionTest(TestCase):
     def test_get_connection(self):
         data = {"name" : "foo", "url" : "bar", "metadata" : {"a" : "b", "c" : "d", "conn_type" : "http", "port" : "443", "host" : "test.com", "schema" : "https"}}
         datacat_type = "airflow_connections"
-        oid = "860355e9-975f-4253-9421-1815e20c879b"
+        oid = "1b02cde7-f079-4053-93a7-9ac299221a01"
 
         conn = get_connection_from_entry(data, datacat_type, oid)
 
@@ -26,7 +26,7 @@ class GetConnectionTest(TestCase):
         # metadata is a json string instead of a dict
         data = {"name" : "foo", "url" : "bar", "metadata" : '{"a" : "b", "c" : "d", "conn_type" : "http", "port" : "443", "host" : "test.com", "schema" : "https"}'}
         datacat_type = "airflow_connections"
-        oid = "860355e9-975f-4253-9421-1815e20c879b"
+        oid = "1b02cde7-f079-4053-93a7-9ac299221a01"
 
         self.assertRaises(TypeError, get_connection_from_entry, data, datacat_type, oid)
 
@@ -81,7 +81,7 @@ class ConnectionTest(TestCase):
     # may fail in future when the testing instance is moved or redeployed - test may need to be updated in that case, since the tested entry may no longer exist
     def test_get_entry(self):
         connection = DataCatConnection(self.url, self.user, self.password)
-        entry_json = connection.get_entry("storage_target", "7aa3877e-2860-4c65-8d48-3e080ceedca2")
+        entry_json = connection.get_entry("storage_target", "57fac614-a664-49b6-9f47-1bde7a46486c")
         entry = DataCatalogEntry.from_json(entry_json)
         self.assertEqual(entry.name, "Yet Another Test Object")
         self.assertEqual(entry.url, "1234")
@@ -101,7 +101,7 @@ class ConnectionTest(TestCase):
     def test_list_entries(self):
         connection = DataCatConnection(self.url, self.user, self.password)
         entries = connection.list_type("storage_target")
-        self.assertIn("6642e0be-b642-499d-a997-d76b8e350387", [e[1] for e in entries])
+        self.assertIn("297e21dc-27f2-4f09-98df-8c0e8b78a5a4", [e[1] for e in entries])
         self.assertIn("DLS-Testing-Connection-Object 1", [e[0] for e in entries])
         self.assertGreaterEqual(len(entries), 5)
 
diff --git a/tests/test_secrets.py b/tests/test_secrets.py
index 5fb51cb224abc67558e24fc8a04fb4b30865fc28..a275b6977a68cb6220dc425a04e79c1a81cff927 100644
--- a/tests/test_secrets.py
+++ b/tests/test_secrets.py
@@ -19,8 +19,8 @@ class TestSecretsBackend(TestCase):
         self.assertIsNotNone(self.backend.password)
     
     def test_get_connection_from_oid(self):
-        conn = self.backend.get_connection("860355e9-975f-4253-9421-1815e20c879b")
-        self.assertEqual(conn.conn_id, "airflow_connections/860355e9-975f-4253-9421-1815e20c879b-connection")
+        conn = self.backend.get_connection("1b02cde7-f079-4053-93a7-9ac299221a01")
+        self.assertEqual(conn.conn_id, "airflow_connections/1b02cde7-f079-4053-93a7-9ac299221a01-connection")
         self.assertEqual(conn.conn_type, "http")
         self.assertEqual(conn.port, "443")
         self.assertEqual(conn.extra_dejson['some_extra'], "secret_12345")
@@ -37,6 +37,26 @@ class TestSecretsBackend(TestCase):
     def test_None_on_invalid_uuid(self):
         self.assertIsNone(self.backend.get_connection("invalid_uuid"))
 
+    def test_type_and_oid(self):
+        conn = self.backend.get_connection("storage_target/57fac614-a664-49b6-9f47-1bde7a46486c")
+        self.assertEqual(conn.conn_id, "storage_target/57fac614-a664-49b6-9f47-1bde7a46486c-connection")
+        self.assertEqual(conn.extra_dejson['1'], '2')
+
+    def test_no_oid(self):
+        self.assertIsNone(self.backend.get_connection(""))
+
+    def test_with_connection_oid_property(self):
+        conn = self.backend.get_connection("dataset/7f228d93-0350-419d-874e-926ac8b7e03d")
+        self.assertEqual(conn.conn_type, "http")
+        self.assertEqual(conn.port, '443')
+        self.assertEqual(conn.extra_dejson['some_public_extra'], "12345")
+
+    def test_multiple_slash_oid(self):
+        self.assertIsNone(self.backend.get_connection("a/b/c/d"))
+        self.assertIsNone(self.backend.get_connection("a/b/c"))
+        self.assertIsNone(self.backend.get_connection("a/b/c/d/3"))
+        self.assertIsNone(self.backend.get_connection("1234/5678/9101112"))
+
 
 class TestSecretsConnection(TestCase):
     def setUp(self):
@@ -50,7 +70,7 @@ class TestSecretsConnection(TestCase):
         self.assertIsNotNone(self.password)
         self.connection = DataCatConnectionWithSecrets(self.url, self.user, self.password)
 
-        self.oid = "860355e9-975f-4253-9421-1815e20c879b"
+        self.oid = "1b02cde7-f079-4053-93a7-9ac299221a01"
         self.conn_type = "airflow_connections" 
         self.keys = ["login", "password", "some_extra", "some_other_extra", "some_other_extra_to_be_overwritten_by_secret"]
         self.key_value_map = { "login": "foo", "password": "bar", "some_extra": "secret_12345", "some_other_extra": "secret_67890",