diff --git a/apiserver/__init__.py b/apiserver/__init__.py index 3fe300e2854a164124076a4d87784a1e83530387..7b99bd15ff5029325444340c734d74d436c96e3e 100644 --- a/apiserver/__init__.py +++ b/apiserver/__init__.py @@ -1 +1 @@ -from .main import app, settings \ No newline at end of file +from .main import app, settings diff --git a/apiserver/config/__init__.py b/apiserver/config/__init__.py index c7398456bb22e02a7a075df1afc2d0d908e94b15..4ba9ca0fd91ba67ea5a22e34113f6fa4c154fe34 100644 --- a/apiserver/config/__init__.py +++ b/apiserver/config/__init__.py @@ -1 +1 @@ -from .settings import ApiserverSettings \ No newline at end of file +from .settings import ApiserverSettings diff --git a/apiserver/main.py b/apiserver/main.py index 8fa5d418aca69a2a19efe33cc4834a7d4d1d1e4d..ff7e1e460bcaba3d29908cceece69a09d95e1e1d 100644 --- a/apiserver/main.py +++ b/apiserver/main.py @@ -48,11 +48,9 @@ settings = ApiserverSettings(_env_file=dotenv_file_path) if settings.encryption_key is not None and settings.encryption_key: log.debug("Using encrypted secrets backend.") - try: - adapter = EncryptedJsonFileStorageAdapter(settings) - except: - log.error("Using encrypetd secrets backend failed. Fallback to unencrypted.") - adapter = JsonFileStorageAdapter(settings) + # let the error break the server (clearly an encrypted backed is requested, + # fallback to non encrypted is not good) + adapter = EncryptedJsonFileStorageAdapter(settings) else: adapter = JsonFileStorageAdapter(settings) @@ -70,7 +68,6 @@ def my_auth(form_data: OAuth2PasswordRequestForm = Depends()): def secrets_required(func): @wraps(func) async def wrapper(*args, **kwargs): - print(f"And kwargs {kwargs}") user = kwargs.get('user', None) if user is None or not user.has_secrets_access: raise HTTPException(403) diff --git a/apiserver/security/user.py b/apiserver/security/user.py index af7e4c71a9aede64352d90038ccdb09dd82b6e7b..bb6d5d7d38b7001c8254c3e9b660f4a8752106ed 100644 --- a/apiserver/security/user.py +++ b/apiserver/security/user.py @@ -47,19 +47,19 @@ class Secret(BaseModel): class AbstractDBInterface(metaclass=abc.ABCMeta): # pragma: no cover @abc.abstractclassmethod - def list(self) -> List: + def list(cls) -> List: raise NotImplementedError() @abc.abstractclassmethod - def get(self, username: str): + def get(cls, username: str): raise NotImplementedError() @abc.abstractclassmethod - def add(self, user: UserInDB): + def add(cls, user: UserInDB): raise NotImplementedError() @abc.abstractclassmethod - def delete(self, username: str): + def delete(cls, username: str): raise NotImplementedError() diff --git a/apiserver/storage/EncryptedJsonFileStorageAdapter.py b/apiserver/storage/EncryptedJsonFileStorageAdapter.py index f580c32842571a4caf3df6b9cabbfeef90b8fc5b..95995a86b04c38b351a8ce7a4ffcd7bbb48ddae8 100644 --- a/apiserver/storage/EncryptedJsonFileStorageAdapter.py +++ b/apiserver/storage/EncryptedJsonFileStorageAdapter.py @@ -1,8 +1,7 @@ -from fastapi.exceptions import HTTPException -from .JsonFileStorageAdapter import JsonFileStorageAdapter, LocationDataType from cryptography.fernet import Fernet from apiserver.config.settings import ApiserverSettings +from .JsonFileStorageAdapter import JsonFileStorageAdapter, LocationDataType class EncryptedJsonFileStorageAdapter(JsonFileStorageAdapter): @@ -13,12 +12,12 @@ class EncryptedJsonFileStorageAdapter(JsonFileStorageAdapter): def decrypt(self, string: str): f = Fernet(self.encryption_key) return f.decrypt(string.encode()).decode("utf-8") - + def __init__(self, settings: ApiserverSettings) -> None: self.encryption_key = settings.encryption_key super().__init__(settings) - - + + def get_secret_values(self, n_type: LocationDataType, oid:str, usr: str): """ get all available secrets (key + value) for this object""" encrypted_dict = super().get_secret_values(n_type, oid, usr) @@ -38,4 +37,4 @@ class EncryptedJsonFileStorageAdapter(JsonFileStorageAdapter): def delete_secret(self, n_type: LocationDataType, oid:str, key: str, usr: str): """ delete and return the value of the requested secret for the given object""" - return self.decrypt(super().delete_secret(n_type, oid, key, usr)) \ No newline at end of file + return self.decrypt(super().delete_secret(n_type, oid, key, usr)) diff --git a/apiserver/storage/JsonFileStorageAdapter.py b/apiserver/storage/JsonFileStorageAdapter.py index 537e3f0f4f205d72b6c38fed09734c68842d5e5f..f4569ac73fccf681a787dad9d135aded1dbe3664 100644 --- a/apiserver/storage/JsonFileStorageAdapter.py +++ b/apiserver/storage/JsonFileStorageAdapter.py @@ -29,17 +29,6 @@ def get_unique_id(path: str) -> str: return oid -def verify_oid(oid: str, version=4): - """ Ensure thatthe oid is formatted as a valid oid (i.e. UUID v4). - If it isn't, the corresponding request could theoretically be - an attempted path traversal attack (or a regular typo). - """ - try: - uuid_obj = uuid.UUID(oid, version=version) - return str(uuid_obj) == oid - except: - return False - class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): """ This stores LocationData via the StoredData Object as json files @@ -85,12 +74,12 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): def __load_secrets(self, path: str) -> Dict[str, str]: if not os.path.isfile(path): return {} - with open(path, "r") as file: - return json.load(file) + with open(path, "r") as f: + return json.load(f) def __store_secrets(self, path: str, secrets: Dict[str, str]): - with open(path, "w") as file: - json.dump(secrets, file) + with open(path, "w") as f: + json.dump(secrets, f) def get_list(self, n_type: LocationDataType) -> List: local_path = self.__setup_path(n_type.value) @@ -137,10 +126,10 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): secrets_path = self.__get_secrets_path(n_type.value, oid) log.debug("Deleted object %s/%s by user '%s'.", n_type, oid, usr) os.remove(full_path) - if (os.path.isfile(secrets_path)): + if os.path.isfile(secrets_path): log.debug("Deleted secrets from object %s/%s by user '%s", n_type, oid, usr) os.remove(secrets_path) - + def list_secrets(self, n_type: LocationDataType, oid:str, usr: str): """ list all available secrets for this object""" secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid) @@ -157,14 +146,14 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid) secrets = self.__load_secrets(secrets_path) secrets[key] = value - # TODO log + log.debug('User %s is updating secretes for %s', usr, oid) self.__store_secrets(secrets_path, secrets) def get_secret(self, n_type: LocationDataType, oid:str, key: str, usr: str): """ return the value of the requested secret for the given object""" secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid) secrets = self.__load_secrets(secrets_path) - # TODO log + log.debug('User %s is retrieving secrets for %s', usr, oid) try: return secrets[key] except KeyError: @@ -177,6 +166,6 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): val = secrets.pop(key, None) if not val: raise HTTPException(404, f"Secret with key {key} does not exist for the object {n_type.value}/{oid}") - # TODO log + log.debug('User %s delete secret for %s', usr, oid) self.__store_secrets(secrets_path, secrets) - return val + return val diff --git a/apiserver/storage/__init__.py b/apiserver/storage/__init__.py index d14af4fbc9c5248d46f91925e463dfa793f5a9d0..5b7553ed48807cfba498f6a30d5d36f23acd524c 100644 --- a/apiserver/storage/__init__.py +++ b/apiserver/storage/__init__.py @@ -1,4 +1,4 @@ -from .JsonFileStorageAdapter import JsonFileStorageAdapter, verify_oid +from .JsonFileStorageAdapter import JsonFileStorageAdapter from .LocationStorage import LocationDataType, LocationData, AbstractLocationDataStorageAdapter diff --git a/tests/storage_tests/test_jsonbackend.py b/tests/storage_tests/test_jsonbackend.py index 84ce9e48020fbfa2c5adb7aaca233523855228a0..8b8352d958aa3d6297caf64117abaec783d31ecc 100644 --- a/tests/storage_tests/test_jsonbackend.py +++ b/tests/storage_tests/test_jsonbackend.py @@ -1,6 +1,6 @@ import unittest -from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData, verify_oid, get_unique_id +from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData, get_unique_id from apiserver.storage import LocationDataType, LocationData from collections import namedtuple import os @@ -102,13 +102,6 @@ class SomeTests(unittest.TestCase): print(details) self.assertIsNone(details) - def test_oid_veirfication(self): - oid = get_unique_id(path='/tmp/') - self.assertTrue(verify_oid(oid=oid)) - self.assertTrue(verify_oid(oid=oid.replace('5', '7'))) - self.assertFalse(verify_oid(oid='random strawberry')) - self.assertFalse(verify_oid(oid=None)) - self.assertFalse(verify_oid(oid=1)) def test_secrets_list_empty(self): self.secret_oid = self.store.add_new(self.secret_type, LocationData(name="secrets_test", url="secrets_url"), "")[0] @@ -158,4 +151,4 @@ class SomeTests(unittest.TestCase): def test_secrets_delete_nonexistent(self): self.secret_oid = self.store.add_new(self.secret_type, LocationData(name="secrets_test", url="secrets_url"), "")[0] self.assertRaises(HTTPException, self.store.delete_secret, self.secret_type, self.secret_oid, "somekey", "") - self.store.delete(self.secret_type, self.secret_oid, "") \ No newline at end of file + self.store.delete(self.secret_type, self.secret_oid, "")