Skip to content
Snippets Groups Projects
Commit e159657b authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

refactor storage for python naming convention

parent 381d40d2
No related branches found
No related tags found
No related merge requests found
Pipeline #68761 passed
**__pycache__/
.vscode/*
# contains data for local tests
app/
site/
\ No newline at end of file
site/
......@@ -74,7 +74,7 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
@app.get("/{location_data_type}")
async def list_datasets(location_data_type: LocationDataType):
# list id and name of every registered dataset for the specified type
return adapter.getList(location_data_type)
return adapter.get_list(location_data_type)
@app.put("/{location_data_type}")
......@@ -82,14 +82,14 @@ async def add_dataset(location_data_type: LocationDataType, dataset: LocationDat
token: str = Depends(oauth2_scheme)):
# register a new dataset, the response will contain the new dataset and its id
usr = "testuser"
return adapter.addNew(location_data_type, dataset, usr)
return adapter.add_new(location_data_type, dataset, usr)
@app.get("/{location_data_type}/{dataset_id}")
async def get_specific_dataset(location_data_type: LocationDataType, dataset_id: str):
# returns all information about a specific dataset, identified by id
try:
return adapter.getDetails(location_data_type, dataset_id)
return adapter.get_details(location_data_type, dataset_id)
except FileNotFoundError:
raise HTTPException(
status_code=404, detail='The provided id does not exist for this datatype.')
......@@ -102,7 +102,7 @@ async def update_specific_dataset(location_data_type: LocationDataType,
# update the information about a specific dataset, identified by id
usr = "testuser"
try:
return adapter.updateDetails(location_data_type, dataset_id, dataset, usr)
return adapter.update_details(location_data_type, dataset_id, dataset, usr)
except FileNotFoundError:
raise HTTPException(
status_code=404, detail='The provided id does not exist for this datatype.')
......
......@@ -8,74 +8,85 @@ from apiserver.config import ApiserverSettings
from typing import List
class StoredData:
actualData: LocationData
users: List[str]
def toDict(self):
return {'actualData' : self.actualData.__dict__, 'users' : self.users}
return {'actualData': self.actualData.__dict__, 'users': self.users}
# This stores LocationData via the StoredData Object as json files
# These Jsonfiles then contain the actualData, as well as the users with permissions for this LocationData
# all users have full permission to to anything with this dataobject, uncluding removing their own access (this might trigger a confirmation via the frontend, but this is not enforced via the api)
# IMPORTANT: The adapter does not check for authentication or authorization, it should only be invoked if the permissions have been checked
class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
data_dir: str
""" This stores LocationData via the StoredData Object as json files
These Jsonfiles then contain the actualData, as well as the users with permissions
for this LocationData all users have full permission to to anything with
this dataobject, uncluding removing their own access (this might trigger a
confirmation via the frontend, but this is not enforced via the api)
IMPORTANT: The adapter does not check for authentication or authorization,
it should only be invoked if the permissions have been checked
"""
def __init__(self, settings: ApiserverSettings):
AbstractLocationDataStorageAdapter.__init__(self)
self.data_dir = settings.json_storage_path
if not (os.path.exists(self.data_dir) and os.path.isdir(self.data_dir)):
raise Exception('Data Directory \"' + self.data_dir + '\" does not exist.')
raise Exception(f"Data Directory {self.data_dir} does not exist.")
def getList(self, type: LocationDataType) -> List:
def get_list(self, type: LocationDataType) -> List:
localpath = os.path.join(self.data_dir, type.value)
if not (os.path.isdir(localpath)):
# This type has apparently not yet been used at all, create its directory and return an empty json file
# This type has apparently not yet been used at all,
# create its directory and return an empty json file
os.mkdir(localpath)
return []
else:
allFiles = [f for f in os.listdir(localpath) if os.path.isfile(os.path.join(localpath, f))]
# now each file has to be checked for its filename (= id) and the LocationData name (which is inside the json)
allFiles = [f for f in os.listdir(
localpath) if os.path.isfile(os.path.join(localpath, f))]
# now each file has to be checked for its filename (= oid)
# and the LocationData name (which is inside the json)
retList = []
for f in allFiles:
with open(os.path.join(localpath, f)) as file:
data = json.load(file)
retList.append({data['actualData']['name'] : f})
retList.append({data['actualData']['name']: f})
return retList
def addNew(self, type: LocationDataType, data: LocationData, usr: str):
def add_new(self, type: LocationDataType, data: LocationData, usr: str):
localpath = os.path.join(self.data_dir, type.value)
if not (os.path.isdir(localpath)):
# This type has apparently not yet been used at all, therefore we need to create its directory
# This type has apparently not yet been used at all,
# therefore we need to create its directory
os.mkdir(localpath)
# create a unique id, by randomly generating one, and re-choosing if it is already taken
id = str(uuid.uuid4())
while (os.path.exists(os.path.join(localpath, id))):
id = str(uuid.uuid4())
# create a unique oid, by randomly generating one,
# and re-choosing if it is already taken
oid = str(uuid.uuid4())
while (os.path.exists(os.path.join(localpath, oid))):
oid = str(uuid.uuid4())
toStore = StoredData()
toStore.users = [usr]
toStore.actualData = data
with open(os.path.join(localpath, id), 'w') as json_file:
with open(os.path.join(localpath, oid), 'w') as json_file:
json.dump(toStore.toDict(), json_file)
return {id : data}
return {oid: data}
def getDetails(self, type: LocationDataType, id: str):
def get_details(self, type: LocationDataType, oid: str):
localpath = os.path.join(self.data_dir, type.value)
fullpath = os.path.join(localpath, id)
fullpath = os.path.join(localpath, oid)
if not os.path.isfile(fullpath):
raise FileNotFoundError('The requested Object does not exist.')
raise FileNotFoundError(f"The requested object ({oid}) does not exist.")
with open(fullpath) as file:
data = json.load(file)
return data['actualData']
def updateDetails(self, type:LocationDataType, id:str, data: LocationData, usr: str):
def update_details(self, type: LocationDataType, oid: str, data: LocationData, usr: str):
localpath = os.path.join(self.data_dir, type.value)
fullpath = os.path.join(localpath, id)
fullpath = os.path.join(localpath, oid)
if not os.path.isfile(fullpath):
raise FileNotFoundError('The requested Object does not exist.')
raise FileNotFoundError(f"The requested object ({oid}) does not exist.")
toStore = StoredData()
toStore.actualData = data
......@@ -86,25 +97,25 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter):
with open(fullpath, 'w') as file:
json.dump(toStore.toDict(), file)
return {id : data}
return {oid: data}
def delete(self, type:LocationDataType, id:str, usr: str):
def delete(self, type: LocationDataType, oid: str, usr: str):
localpath = os.path.join(self.data_dir, type.value)
fullpath = os.path.join(localpath, id)
fullpath = os.path.join(localpath, oid)
if not os.path.isfile(fullpath):
raise FileNotFoundError('The requested Object does not exist.')
raise FileNotFoundError(f"The requested object {oid} does not exist.")
os.remove(fullpath)
def getOwner(self, type: LocationDataType, id: str):
def get_owner(self, type: LocationDataType, oid: str):
raise NotImplementedError()
def checkPerm(self, type: LocationDataType, id: str, usr: str):
def check_perm(self, type: LocationDataType, oid: str, usr: str):
raise NotImplementedError()
def addPerm(self, type: LocationDataType, id: str, authUsr: str, newUser: str):
def add_perm(self, type: LocationDataType, oid: str, authUsr: str, newUser: str):
raise NotImplementedError()
def rmPerm(self, type: LocationDataType, id: str, usr: str, rmUser: str):
raise NotImplementedError()
\ No newline at end of file
def rm_perm(self, type: LocationDataType, oid: str, usr: str, rmUser: str):
raise NotImplementedError()
......@@ -5,59 +5,69 @@ from typing import Optional
from typing import Dict
from enum import Enum
class LocationDataType(Enum):
DATASET: str = 'dataset'
STORAGETARGET: str = 'storage_target'
DATASET = 'dataset'
STORAGETARGET = 'storage_target'
class LocationData(BaseModel):
name: str
url: str
metadata: Optional[Dict[str, str]]
#
'''
This is an abstract storage adapter for storing information about datasets, storage targets and similar things.
It can easily be expanded to also store other data (that has roughly similar metadata), just by expanding the LocationDataType Enum.
In general, all data is public. This means, that the adapter does not do any permission checking, except when explicitly asked via the checkPerm function.
The caller therefore has to manually decide when to check for permissions, and not call any function unless it is already authorized (or does not need any authorization).
The usr: str (the user id) that is required for several functions, is a unique and immutable string, that identifies the user. This can be a verified email or a user name.
The management of authentication etc. is done by the caller, this adapter assumes that the user id fulfills the criteria.
Permissions are stored as a list of user ids, and every id is authorized for full access.
'''
class AbstractLocationDataStorageAdapter:
'''
This is an abstract storage adapter for storing information about datasets,
storage targets and similar things. It can easily be expanded to also store
other data (that has roughly similar metadata), just by expanding
the `LocationDataType` Enum.
In general, all data is public. This means, that the adapter does not
do any permission checking, except when explicitly asked via the `checkPerm`
function. The caller therefore has to manually decide when to check for
permissions, and not call any function unless it is already authorized
(or does not need any authorization).
The usr: str (the user id) that is required for several functions, is
a unique and immutable string, that identifies the user. This can be a
verified email or a user name. The management of authentication etc. is
done by the caller, this adapter assumes that the user id fulfills the criteria.
Permissions are stored as a list of user ids, and every id is authorized for full access.
'''
# get a list of all LocationData Elements with the provided type, as pairs of {name : id}
def getList(self, type: LocationDataType):
def get_list(self, type: LocationDataType):
raise NotImplementedError()
# add a new element of the provided type, assign and return the id and the new data as {id : LocationData}
def addNew(self, type: LocationDataType, data: LocationData, usr: str):
def add_new(self, type: LocationDataType, data: LocationData, usr: str):
raise NotImplementedError()
# return the LocationData of the requested object (identified by id and type)
def getDetails(self, type: LocationDataType, id: str):
# return the LocationData of the requested object (identified by oid and type)
def get_details(self, type: LocationDataType, oid: str):
raise NotImplementedError()
# change the details of the requested object, return {id : newData}
def updateDetails(self, type:LocationDataType, id:str, data: LocationData, usr: str):
# change the details of the requested object, return {oid : newData}
def update_details(self, type: LocationDataType, oid: str, data: LocationData, usr: str):
raise NotImplementedError()
def delete(self, type:LocationDataType, id:str, usr: str):
def delete(self, type: LocationDataType, oid: str, usr: str):
raise NotImplementedError()
# return the owner of the requested object; if multiple owners are set, return them is a list
def getOwner(self, type: LocationDataType, id: str):
def get_owner(self, type: LocationDataType, oid: str):
raise NotImplementedError()
# check if the given user has permission to change the given object
def checkPerm(self, type: LocationDataType, id: str, usr: str):
def check_perm(self, type: LocationDataType, oid: str, usr: str):
raise NotImplementedError()
# add user to file perm
def addPerm(self, type: LocationDataType, id: str, usr: str):
def add_perm(self, type: LocationDataType, oid: str, usr: str):
raise NotImplementedError()
# remove user from file perm
def rmPerm(self, type: LocationDataType, id: str, usr: str):
def rm_perm(self, type: LocationDataType, oid: str, usr: str):
raise NotImplementedError()
......@@ -24,6 +24,6 @@ class SomeTests(unittest.TestCase):
def test_getList(self):
test_type = LocationDataType.DATASET
lst = self.store.getList(type=test_type)
lst = self.store.get_list(type=test_type)
self.assertEqual(lst, [], 'Id should not be none')
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment