Skip to content
Snippets Groups Projects
Commit fc05f0dc authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

code formating, check oid removal (we dont use it anymore?)

parent a46d4afc
Branches
Tags
No related merge requests found
Pipeline #84791 passed
...@@ -48,11 +48,9 @@ settings = ApiserverSettings(_env_file=dotenv_file_path) ...@@ -48,11 +48,9 @@ settings = ApiserverSettings(_env_file=dotenv_file_path)
if settings.encryption_key is not None and settings.encryption_key: if settings.encryption_key is not None and settings.encryption_key:
log.debug("Using encrypted secrets backend.") log.debug("Using encrypted secrets backend.")
try: # let the error break the server (clearly an encrypted backed is requested,
# fallback to non encrypted is not good)
adapter = EncryptedJsonFileStorageAdapter(settings) adapter = EncryptedJsonFileStorageAdapter(settings)
except:
log.error("Using encrypetd secrets backend failed. Fallback to unencrypted.")
adapter = JsonFileStorageAdapter(settings)
else: else:
adapter = JsonFileStorageAdapter(settings) adapter = JsonFileStorageAdapter(settings)
...@@ -70,7 +68,6 @@ def my_auth(form_data: OAuth2PasswordRequestForm = Depends()): ...@@ -70,7 +68,6 @@ def my_auth(form_data: OAuth2PasswordRequestForm = Depends()):
def secrets_required(func): def secrets_required(func):
@wraps(func) @wraps(func)
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
print(f"And kwargs {kwargs}")
user = kwargs.get('user', None) user = kwargs.get('user', None)
if user is None or not user.has_secrets_access: if user is None or not user.has_secrets_access:
raise HTTPException(403) raise HTTPException(403)
......
...@@ -47,19 +47,19 @@ class Secret(BaseModel): ...@@ -47,19 +47,19 @@ class Secret(BaseModel):
class AbstractDBInterface(metaclass=abc.ABCMeta): # pragma: no cover class AbstractDBInterface(metaclass=abc.ABCMeta): # pragma: no cover
@abc.abstractclassmethod @abc.abstractclassmethod
def list(self) -> List: def list(cls) -> List:
raise NotImplementedError() raise NotImplementedError()
@abc.abstractclassmethod @abc.abstractclassmethod
def get(self, username: str): def get(cls, username: str):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractclassmethod @abc.abstractclassmethod
def add(self, user: UserInDB): def add(cls, user: UserInDB):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractclassmethod @abc.abstractclassmethod
def delete(self, username: str): def delete(cls, username: str):
raise NotImplementedError() raise NotImplementedError()
......
from fastapi.exceptions import HTTPException
from .JsonFileStorageAdapter import JsonFileStorageAdapter, LocationDataType
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from apiserver.config.settings import ApiserverSettings from apiserver.config.settings import ApiserverSettings
from .JsonFileStorageAdapter import JsonFileStorageAdapter, LocationDataType
class EncryptedJsonFileStorageAdapter(JsonFileStorageAdapter): class EncryptedJsonFileStorageAdapter(JsonFileStorageAdapter):
......
...@@ -29,17 +29,6 @@ def get_unique_id(path: str) -> str: ...@@ -29,17 +29,6 @@ def get_unique_id(path: str) -> str:
return oid return oid
def verify_oid(oid: str, version=4):
""" Ensure thatthe oid is formatted as a valid oid (i.e. UUID v4).
If it isn't, the corresponding request could theoretically be
an attempted path traversal attack (or a regular typo).
"""
try:
uuid_obj = uuid.UUID(oid, version=version)
return str(uuid_obj) == oid
except:
return False
class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
""" This stores LocationData via the StoredData Object as json files """ This stores LocationData via the StoredData Object as json files
...@@ -85,12 +74,12 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): ...@@ -85,12 +74,12 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
def __load_secrets(self, path: str) -> Dict[str, str]: def __load_secrets(self, path: str) -> Dict[str, str]:
if not os.path.isfile(path): if not os.path.isfile(path):
return {} return {}
with open(path, "r") as file: with open(path, "r") as f:
return json.load(file) return json.load(f)
def __store_secrets(self, path: str, secrets: Dict[str, str]): def __store_secrets(self, path: str, secrets: Dict[str, str]):
with open(path, "w") as file: with open(path, "w") as f:
json.dump(secrets, file) json.dump(secrets, f)
def get_list(self, n_type: LocationDataType) -> List: def get_list(self, n_type: LocationDataType) -> List:
local_path = self.__setup_path(n_type.value) local_path = self.__setup_path(n_type.value)
...@@ -137,7 +126,7 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): ...@@ -137,7 +126,7 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
secrets_path = self.__get_secrets_path(n_type.value, oid) secrets_path = self.__get_secrets_path(n_type.value, oid)
log.debug("Deleted object %s/%s by user '%s'.", n_type, oid, usr) log.debug("Deleted object %s/%s by user '%s'.", n_type, oid, usr)
os.remove(full_path) os.remove(full_path)
if (os.path.isfile(secrets_path)): if os.path.isfile(secrets_path):
log.debug("Deleted secrets from object %s/%s by user '%s", n_type, oid, usr) log.debug("Deleted secrets from object %s/%s by user '%s", n_type, oid, usr)
os.remove(secrets_path) os.remove(secrets_path)
...@@ -157,14 +146,14 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): ...@@ -157,14 +146,14 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid) secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid)
secrets = self.__load_secrets(secrets_path) secrets = self.__load_secrets(secrets_path)
secrets[key] = value secrets[key] = value
# TODO log log.debug('User %s is updating secretes for %s', usr, oid)
self.__store_secrets(secrets_path, secrets) self.__store_secrets(secrets_path, secrets)
def get_secret(self, n_type: LocationDataType, oid:str, key: str, usr: str): def get_secret(self, n_type: LocationDataType, oid:str, key: str, usr: str):
""" return the value of the requested secret for the given object""" """ return the value of the requested secret for the given object"""
secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid) secrets_path = self.__get_secrets_path(value=n_type.value, oid=oid)
secrets = self.__load_secrets(secrets_path) secrets = self.__load_secrets(secrets_path)
# TODO log log.debug('User %s is retrieving secrets for %s', usr, oid)
try: try:
return secrets[key] return secrets[key]
except KeyError: except KeyError:
...@@ -177,6 +166,6 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): ...@@ -177,6 +166,6 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
val = secrets.pop(key, None) val = secrets.pop(key, None)
if not val: if not val:
raise HTTPException(404, f"Secret with key {key} does not exist for the object {n_type.value}/{oid}") raise HTTPException(404, f"Secret with key {key} does not exist for the object {n_type.value}/{oid}")
# TODO log log.debug('User %s delete secret for %s', usr, oid)
self.__store_secrets(secrets_path, secrets) self.__store_secrets(secrets_path, secrets)
return val return val
from .JsonFileStorageAdapter import JsonFileStorageAdapter, verify_oid from .JsonFileStorageAdapter import JsonFileStorageAdapter
from .LocationStorage import LocationDataType, LocationData, AbstractLocationDataStorageAdapter from .LocationStorage import LocationDataType, LocationData, AbstractLocationDataStorageAdapter
......
import unittest import unittest
from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData, verify_oid, get_unique_id from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData, get_unique_id
from apiserver.storage import LocationDataType, LocationData from apiserver.storage import LocationDataType, LocationData
from collections import namedtuple from collections import namedtuple
import os import os
...@@ -102,13 +102,6 @@ class SomeTests(unittest.TestCase): ...@@ -102,13 +102,6 @@ class SomeTests(unittest.TestCase):
print(details) print(details)
self.assertIsNone(details) self.assertIsNone(details)
def test_oid_veirfication(self):
oid = get_unique_id(path='/tmp/')
self.assertTrue(verify_oid(oid=oid))
self.assertTrue(verify_oid(oid=oid.replace('5', '7')))
self.assertFalse(verify_oid(oid='random strawberry'))
self.assertFalse(verify_oid(oid=None))
self.assertFalse(verify_oid(oid=1))
def test_secrets_list_empty(self): def test_secrets_list_empty(self):
self.secret_oid = self.store.add_new(self.secret_type, LocationData(name="secrets_test", url="secrets_url"), "")[0] self.secret_oid = self.store.add_new(self.secret_type, LocationData(name="secrets_test", url="secrets_url"), "")[0]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment