Skip to content
Snippets Groups Projects
Commit e8f66c8c authored by Christian Boettcher's avatar Christian Boettcher
Browse files

Differentauth

parent dfa51908
No related branches found
No related tags found
1 merge request!6Differentauth
...@@ -221,7 +221,8 @@ test-testing: ...@@ -221,7 +221,8 @@ test-testing:
script: script:
- apt update && apt -y install curl - apt update && apt -y install curl
- echo "For now, this will be a basic health check i.e. GET / and check for 2xx code." - echo "For now, this will be a basic health check i.e. GET / and check for 2xx code."
- 'curl -f -H "Accept: application/json" $TESTING_URL' - echo "We are affected by the expried lets encrypt cert?"
- 'curl --insecure -I -H "Accept: application/json" $TESTING_URL'
test-production: test-production:
cache: {} cache: {}
......
...@@ -6,6 +6,7 @@ import os ...@@ -6,6 +6,7 @@ import os
from datetime import timedelta from datetime import timedelta
from enum import Enum from enum import Enum
from typing import List from typing import List
from functools import wraps
from fastapi import FastAPI, HTTPException, Request, status from fastapi import FastAPI, HTTPException, Request, status
from fastapi.param_functions import Depends from fastapi.param_functions import Depends
...@@ -66,6 +67,16 @@ def my_user(token=Depends(oauth2_scheme)): ...@@ -66,6 +67,16 @@ def my_user(token=Depends(oauth2_scheme)):
def my_auth(form_data: OAuth2PasswordRequestForm = Depends()): def my_auth(form_data: OAuth2PasswordRequestForm = Depends()):
return authenticate_user(userdb, form_data.username, form_data.password) return authenticate_user(userdb, form_data.username, form_data.password)
def secrets_required(func):
@wraps(func)
async def wrapper(*args, **kwargs):
print(f"And kwargs {kwargs}")
user = kwargs.get('user', None)
if user is None or not user.has_secrets_access:
raise HTTPException(403)
return await func(*args, **kwargs)
return wrapper
@app.get("/me", response_model=User) @app.get("/me", response_model=User)
async def read_users_me(user=Depends(my_user)): async def read_users_me(user=Depends(my_user)):
"""return information about the currently logged in user""" """return information about the currently logged in user"""
...@@ -89,6 +100,7 @@ async def login_for_access_token(user=Depends(my_auth)): ...@@ -89,6 +100,7 @@ async def login_for_access_token(user=Depends(my_auth)):
log.debug("Authenticed User: '%s' requested /token", user.username) log.debug("Authenticed User: '%s' requested /token", user.username)
return {"access_token": access_token, "token_type": "bearer"} return {"access_token": access_token, "token_type": "bearer"}
@app.get("/", response_model=List[dict[str, str]]) @app.get("/", response_model=List[dict[str, str]])
async def get_types(request: Request = None): async def get_types(request: Request = None):
""" """
...@@ -160,16 +172,14 @@ async def delete_specific_dataset(location_data_type: LocationDataType, ...@@ -160,16 +172,14 @@ async def delete_specific_dataset(location_data_type: LocationDataType,
return adapter.delete(location_data_type, str(dataset_id), user.username) return adapter.delete(location_data_type, str(dataset_id), user.username)
@app.get("/{location_data_type}/{dataset_id}/secrets") @app.get("/{location_data_type}/{dataset_id}/secrets")
@secrets_required
async def list_dataset_secrets(location_data_type: LocationDataType, async def list_dataset_secrets(location_data_type: LocationDataType,
dataset_id: UUID4, dataset_id: UUID4,
user: User = Depends(my_user)): user: User = Depends(my_user)):
"""list the secrets of a specific dataset""" """list the secrets of a specific dataset"""
if user.has_secrets_access:
log.debug("Authenticed User: '%s' listed the secrets of /%s/%s", user.username, location_data_type.value, dataset_id) log.debug("Authenticed User: '%s' listed the secrets of /%s/%s", user.username, location_data_type.value, dataset_id)
return adapter.list_secrets(location_data_type, dataset_id, user) return adapter.list_secrets(location_data_type, dataset_id, user)
raise HTTPException(403)
@app.get("/{location_data_type}/{dataset_id}/secrets_values") @app.get("/{location_data_type}/{dataset_id}/secrets_values")
async def list_dataset_secrets(location_data_type: LocationDataType, async def list_dataset_secrets(location_data_type: LocationDataType,
dataset_id: UUID4, dataset_id: UUID4,
...@@ -182,39 +192,35 @@ async def list_dataset_secrets(location_data_type: LocationDataType, ...@@ -182,39 +192,35 @@ async def list_dataset_secrets(location_data_type: LocationDataType,
raise HTTPException(403) raise HTTPException(403)
@app.get("/{location_data_type}/{dataset_id}/secrets/{key}") @app.get("/{location_data_type}/{dataset_id}/secrets/{key}")
@secrets_required
async def get_dataset_secret(location_data_type: LocationDataType, async def get_dataset_secret(location_data_type: LocationDataType,
dataset_id: UUID4, dataset_id: UUID4,
key: str, key: str,
user: User = Depends(my_user)): user: User = Depends(my_user)):
"""get the secret of a specific dataset""" """get the secret of a specific dataset"""
if user.has_secrets_access:
log.debug("Authenticed User: '%s' listed the secret %s of /%s/%s", user.username, key, location_data_type.value, dataset_id) log.debug("Authenticed User: '%s' listed the secret %s of /%s/%s", user.username, key, location_data_type.value, dataset_id)
return adapter.get_secret(location_data_type, dataset_id, key, user) return adapter.get_secret(location_data_type, dataset_id, key, user)
raise HTTPException(403)
@app.post("/{location_data_type}/{dataset_id}/secrets") @app.post("/{location_data_type}/{dataset_id}/secrets")
@secrets_required
async def add_update_dataset_secret(location_data_type: LocationDataType, async def add_update_dataset_secret(location_data_type: LocationDataType,
dataset_id: UUID4, dataset_id: UUID4,
secret: Secret, secret: Secret,
user: User = Depends(my_user)): user: User = Depends(my_user)):
"""add or update a secrets to a specific dataset""" """add or update a secrets to a specific dataset"""
if user.has_secrets_access:
log.debug("Authenticed User: '%s' added or updated the secret %s of /%s/%s", user.username, secret.key, location_data_type.value, dataset_id) log.debug("Authenticed User: '%s' added or updated the secret %s of /%s/%s", user.username, secret.key, location_data_type.value, dataset_id)
return adapter.add_update_secret(location_data_type, dataset_id, secret.key, secret.secret, user) return adapter.add_update_secret(location_data_type, dataset_id, secret.key, secret.secret, user)
raise HTTPException(403)
@app.delete("/{location_data_type}/{dataset_id}/secrets/{key}") @app.delete("/{location_data_type}/{dataset_id}/secrets/{key}")
@secrets_required
async def get_dataset_secrets(location_data_type: LocationDataType, async def get_dataset_secrets(location_data_type: LocationDataType,
dataset_id: UUID4, dataset_id: UUID4,
key: str, key: str,
user: User = Depends(my_user)): user: User = Depends(my_user)):
"""delete a secret from a specific dataset""" """delete a secret from a specific dataset"""
if user.has_secrets_access:
log.debug("Authenticed User: '%s' deleted the secret %s from /%s/%s", user.username, key, location_data_type.value, dataset_id) log.debug("Authenticed User: '%s' deleted the secret %s from /%s/%s", user.username, key, location_data_type.value, dataset_id)
return adapter.delete_secret(location_data_type, dataset_id, key, user) return adapter.delete_secret(location_data_type, dataset_id, key, user)
raise HTTPException(403)
@app.exception_handler(FileNotFoundError) @app.exception_handler(FileNotFoundError)
async def not_found_handler(request: Request, ex: FileNotFoundError): async def not_found_handler(request: Request, ex: FileNotFoundError):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment