From 3aaad1e106c0afd698766f6a27b3c85e015cf193 Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Wed, 23 Jun 2021 09:03:25 +0200 Subject: [PATCH] oid verify test --- apiserver/main.py | 11 ++++++----- apiserver/storage/JsonFileStorageAdapter.py | 10 ++++++---- tests/apiserver_tests/test_responsiveness.py | 4 ++-- tests/storage_tests/test_jsonbackend.py | 10 ++++++++-- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/apiserver/main.py b/apiserver/main.py index f527e4b..6dc2b6f 100644 --- a/apiserver/main.py +++ b/apiserver/main.py @@ -1,10 +1,11 @@ """ Main module of data catalog api """ -import logging, os +import logging +import os from datetime import timedelta from enum import Enum -from typing import List, Tuple +from typing import List from fastapi import FastAPI, HTTPException, Request, status from fastapi.param_functions import Depends @@ -77,7 +78,7 @@ async def get_types(): """ return [{element.value: "/" + element.value} for element in LocationDataType] -@app.get("/{location_data_type}") +@app.get("/{location_data_type}") async def list_datasets(location_data_type: LocationDataType): """list id and name of every registered dataset for the specified type""" return adapter.get_list(location_data_type) @@ -90,7 +91,7 @@ async def get_specific_dataset(location_data_type: LocationDataType, dataset_id: raise HTTPException(status_code=400, detail="Invalid OID format!") return adapter.get_details(location_data_type, dataset_id) -@app.post("/{location_data_type}") +@app.post("/{location_data_type}") async def add_dataset(location_data_type: LocationDataType, dataset: LocationData, user: User = Depends(my_user)): @@ -124,4 +125,4 @@ async def not_found_handler(request: Request, ex: FileNotFoundError): _ =request.path_params.get('dataset_id', '') logging.error("File not found translated %s", ex) return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, - content={'message':f"Object does not exist"}) + content={'message':'Object does not exist'}) diff --git a/apiserver/storage/JsonFileStorageAdapter.py b/apiserver/storage/JsonFileStorageAdapter.py index b2197a2..ff7dcae 100644 --- a/apiserver/storage/JsonFileStorageAdapter.py +++ b/apiserver/storage/JsonFileStorageAdapter.py @@ -2,6 +2,7 @@ import json import os import uuid from typing import List +import logging from pydantic import BaseModel @@ -27,13 +28,14 @@ def get_unique_id(path: str) -> str: def verify_oid(oid: str, version=4): """ Ensure thatthe oid is formatted as a valid oid (i.e. UUID v4). - If it isn't, the corresponding request could theoretically be an attempted path traversal attack (or a regular typo). + If it isn't, the corresponding request could theoretically be + an attempted path traversal attack (or a regular typo). """ try: uuid_obj = uuid.UUID(oid, version=version) + return str(uuid_obj) == oid except: return False - return str(uuid_obj) == oid class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): """ This stores LocationData via the StoredData Object as json files @@ -64,9 +66,9 @@ class JsonFileStorageAdapter(AbstractLocationDataStorageAdapter): full_path = os.path.join(localpath, oid) common = os.path.commonprefix((os.path.realpath(full_path),os.path.realpath(self.data_dir))) if common != os.path.realpath(self.data_dir): - print(f"Escaping the data dir! {common} {full_path}") + logging.error(f"Escaping the data dir! {common} {full_path}") raise FileNotFoundError() - + if not os.path.isfile(full_path): raise FileNotFoundError( f"The requested object ({oid}) {full_path} does not exist.") diff --git a/tests/apiserver_tests/test_responsiveness.py b/tests/apiserver_tests/test_responsiveness.py index 7723ef4..a6fb7b3 100644 --- a/tests/apiserver_tests/test_responsiveness.py +++ b/tests/apiserver_tests/test_responsiveness.py @@ -38,14 +38,14 @@ class NonAuthTests(unittest.TestCase): def test_token(self): rsp = self.client.post('/token', data={'username': 'foo', 'password': 'bar'}) - self.assertEqual(rsp.status_code, 401, 'Ath') + self.assertEqual(rsp.status_code, 401, 'Auth required') def test_get_non_existing(self): rsp = self.client.get(f'/dataset/{proper_uuid}') self.assertEqual(404, rsp.status_code) j = rsp.json() self.assertTrue('message' in j, f"{j} should contain message") - self.assertFalse('foo' in j['message'], f"error message should contain object id (foo)") + self.assertFalse('foo' in j['message'], f"error message should not contain object id (foo)") def test_get_invalid_oid(self): rsp = self.client.get('/dataset/invalid-uuid') diff --git a/tests/storage_tests/test_jsonbackend.py b/tests/storage_tests/test_jsonbackend.py index 2abc405..52dc17c 100644 --- a/tests/storage_tests/test_jsonbackend.py +++ b/tests/storage_tests/test_jsonbackend.py @@ -1,6 +1,6 @@ import unittest -from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData +from apiserver.storage.JsonFileStorageAdapter import JsonFileStorageAdapter, StoredData, verify_oid, get_unique_id from apiserver.storage import LocationDataType, LocationData from collections import namedtuple import os @@ -100,4 +100,10 @@ class SomeTests(unittest.TestCase): print(details) self.assertIsNone(details) - + def test_oid_veirfication(self): + oid = get_unique_id(path='/tmp/') + self.assertTrue(verify_oid(oid=oid)) + self.assertTrue(verify_oid(oid=oid.replace('5', '7'))) + self.assertFalse(verify_oid(oid='random strawberry')) + self.assertFalse(verify_oid(oid=None)) + self.assertFalse(verify_oid(oid=1)) -- GitLab