diff --git a/.gitignore b/.gitignore index 7e667c3ca958a32c9a5fa29d9bb32db9aff3554e..7e4285689509ebc7cf6763c8611759614ad5d566 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ dist/ __pycache__/ *.egg-info/ +coverage_html_report/ # generated files .coverage diff --git a/tests/test_secrets.py b/tests/test_secrets.py index 2c1611ef0e6f71cc20c6ca173cbcd84ef05286bb..489f6be9e427b8e58ec614697a5588bbe776e123 100644 --- a/tests/test_secrets.py +++ b/tests/test_secrets.py @@ -1,11 +1,11 @@ from unittest import TestCase -import os +import os, json from dotenv import load_dotenv from datacat_integration.secrets import DataCatConnectionWithSecrets, get_connection_with_secrets_from_entry, DatacatSecretsBackend -class TestSecretsBackenbd(TestCase): +class TestSecretsBackend(TestCase): def setUp(self): load_dotenv("tests/testing-authentication.env", verbose=False) # does nothing if file is not present self.backend = DatacatSecretsBackend() @@ -25,3 +25,40 @@ class TestSecretsBackenbd(TestCase): self.assertEqual(conn.extra_dejson['some_extra'], "secret_12345") self.assertEqual(conn.extra_dejson['some_public_extra'], "12345") self.assertEqual(conn.extra_dejson['some_other_extra_to_be_overwritten_by_secret'], "secret_67890") + + +class TestSecretsConnection(TestCase): + def setUp(self): + load_dotenv("tests/testing-authentication.env", verbose=False) # does nothing if file is not present + + self.url = os.getenv('DATACAT_URL', "https://zam10036.zam.kfa-juelich.de") + self.user = os.getenv('DATACAT_LOGIN', "dls-testing") + self.password = os.getenv('DATACAT_PASSWORD') + self.assertIsNotNone(self.url) + self.assertIsNotNone(self.user) + self.assertIsNotNone(self.password) + self.connection = DataCatConnectionWithSecrets(self.url, self.user, self.password) + + self.oid = "860355e9-975f-4253-9421-1815e20c879b" + self.conn_type = "airflow_connections" + self.keys = ["login", "password", "some_extra", "some_other_extra", "some_other_extra_to_be_overwritten_by_secret"] + self.key_value_map = { "login": "foo", "password": "bar", "some_extra": "secret_12345", "some_other_extra": "secret_67890", + "some_other_extra_to_be_overwritten_by_secret": "secret_67890" } + + def test_get_keys(self): + keys = json.loads(self.connection.get_secrets_keys(self.conn_type, self.oid)) + self.assertListEqual(keys, self.keys) + + def test_get_key_value(self): + k_v_map = json.loads(self.connection.get_all_secret_key_value(self.conn_type, self.oid)) + self.assertDictEqual(k_v_map, self.key_value_map) + + def test_get_single_value(self): + for key in self.keys: + self.assertEqual(self.key_value_map[key], json.loads(self.connection.get_secret_value(self.conn_type, self.oid, key))) + + + def test_connection_errore(self): + self.assertRaises(ConnectionError, self.connection.get_secrets_keys, self.conn_type, "invalid_oid") + self.assertRaises(ConnectionError, self.connection.get_all_secret_key_value, self.conn_type, "invalid_oid") + self.assertRaises(ConnectionError, self.connection.get_secret_value, self.conn_type, "invalid_oid", "fake_key")