diff --git a/apiserver/main.py b/apiserver/main.py index f527e4bde179be09e1305b4bc32dfaa5185df388..6dc2b6ff299eece2268162b26be73d448d1f1b44 100644 --- a/apiserver/main.py +++ b/apiserver/main.py @@ -1,10 +1,11 @@ """ Main module of data catalog api """ -import logging, os +import logging +import os from datetime import timedelta from enum import Enum -from typing import List, Tuple +from typing import List from fastapi import FastAPI, HTTPException, Request, status from fastapi.param_functions import Depends @@ -77,7 +78,7 @@ async def get_types(): """ return [{element.value: "/" + element.value} for element in LocationDataType] -@app.get("/{location_data_type}") +@app.get("/{location_data_type}") async def list_datasets(location_data_type: LocationDataType): """list id and name of every registered dataset for the specified type""" return adapter.get_list(location_data_type) @@ -90,7 +91,7 @@ async def get_specific_dataset(location_data_type: LocationDataType, dataset_id: raise HTTPException(status_code=400, detail="Invalid OID format!") return adapter.get_details(location_data_type, dataset_id) -@app.post("/{location_data_type}") +@app.post("/{location_data_type}") async def add_dataset(location_data_type: LocationDataType, dataset: LocationData, user: User = Depends(my_user)): @@ -124,4 +125,4 @@ async def not_found_handler(request: Request, ex: FileNotFoundError): _ =request.path_params.get('dataset_id', '') logging.error("File not found translated %s", ex) return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, - content={'message':f"Object does not exist"}) + content={'message':'Object does not exist'}) diff --git a/apiserver/storage/JsonFileStorageAdapter.py b/apiserver/storage/JsonFileStorageAdapter.py index b2197a2d1011fed08ec701f294e76deebc827967..ff7dcaefa34711bee11e983ce863896ab7f3e4c4 100644 --- a/apiserver/storage/JsonFileStorageAdapter.py +++ b/apiserver/storage/JsonFileStorageAdapter.py @@ -2,6 +2,7 @@ import json import os import uuid from typing import List +import logging from pydantic import BaseModel @@ -27,13 +28,14 @@ def get_unique_id(path: str) -> str: def verify_oid(oid: str, version=4): """ Ensure thatthe oid is formatted as a valid oid (i.e. UUID v4). - If it isn't, the corresponding request could theoretically be an attempted path traversal attack (or a regular typo). + If it isn't, the corresponding request could theoretically be + an attempted path traversal attack (or a regular typo). """ try: uuid_obj = uuid.UUID(oid, version=version) + return str(uuid_obj) == oid except: return False - return str(uuid_obj) == oid class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): """ This stores LocationData via the StoredData Object as json files @@ -64,9 +66,9 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): full_path = os.path.join(localpath, oid) common = os.path.commonprefix((os.path.realpath(full_path),os.path.realpath(self.data_dir))) if common != os.path.realpath(self.data_dir): - print(f"Escaping the data dir! {common} {full_path}") + logging.error(f"Escaping the data dir! {common} {full_path}") raise FileNotFoundError() - + if not os.path.isfile(full_path): raise FileNotFoundError( f"The requested object ({oid}) {full_path} does not exist.") diff --git a/tests/apiserver_tests/test_responsiveness.py b/tests/apiserver_tests/test_responsiveness.py index 7723ef48ab21d7feabf15745826ae88ed03fd1f5..a6fb7b31e3ee951a69786f16e8e5922c6e99d839 100644 --- a/tests/apiserver_tests/test_responsiveness.py +++ b/tests/apiserver_tests/test_responsiveness.py @@ -38,14 +38,14 @@ class NonAuthTests(unittest.TestCase): def test_token(self): rsp = self.client.post('/token', data={'username': 'foo', 'password': 'bar'}) - self.assertEqual(rsp.status_code, 401, 'Ath') + self.assertEqual(rsp.status_code, 401, 'Auth required') def test_get_non_existing(self): rsp = self.client.get(f'/dataset/{proper_uuid}') self.assertEqual(404, rsp.status_code) j = rsp.json() self.assertTrue('message' in j, f"{j} should contain message") - self.assertFalse('foo' in j['message'], f"error message should contain object id (foo)") + self.assertFalse('foo' in j['message'], f"error message should not contain object id (foo)") def test_get_invalid_oid(self): rsp = self.client.get('/dataset/invalid-uuid') diff --git a/tests/storage_tests/test_jsonbackend.py b/tests/storage_tests/test_jsonbackend.py index 2abc405e5e8109b927388e88458cb510c5917e37..52dc17c6cbbbcb55a8212cb9ab7b585bc95c3202 100644 --- a/tests/storage_tests/test_jsonbackend.py +++ b/tests/storage_tests/test_jsonbackend.py @@ -1,6 +1,6 @@ import unittest -from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData +from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData, verify_oid, get_unique_id from apiserver.storage import LocationDataType, LocationData from collections import namedtuple import os @@ -100,4 +100,10 @@ class SomeTests(unittest.TestCase): print(details) self.assertIsNone(details) - + def test_oid_veirfication(self): + oid = get_unique_id(path='/tmp/') + self.assertTrue(verify_oid(oid=oid)) + self.assertTrue(verify_oid(oid=oid.replace('5', '7'))) + self.assertFalse(verify_oid(oid='random strawberry')) + self.assertFalse(verify_oid(oid=None)) + self.assertFalse(verify_oid(oid=1))