Skip to content
Snippets Groups Projects
Commit 67afb0ac authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

authenticte and current user tests

parent 6e6e78eb
No related branches found
No related tags found
No related merge requests found
Pipeline #69464 passed
......@@ -114,25 +114,18 @@ def get_password_hash(password):
def authenticate_user(userdb: AbstractDBInterface, username: str, password: str):
user: UserInDB = get_user(userdb, username)
user: UserInDB = userdb.get(username)
if user and verify_password(password, user.hashed_password):
return user
return None
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
def create_access_token(data: dict, expires_delta: Optional[timedelta] = timedelta(minutes=15)):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_user(db: AbstractDBInterface, username: str):
return db.get(username)
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
......@@ -144,7 +137,7 @@ def get_current_user(token: str, userdb: AbstractDBInterface):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if (username is None) or ((user:=get_user(userdb, username)) is None):
if (username is None) or ((user:=userdb.get(username)) is None):
raise credentials_exception
return user
......
......@@ -36,3 +36,11 @@ class NonAuthTests(unittest.TestCase):
def test_token(self):
rsp = self.client.post('/token', data={'username': 'foo', 'password': 'bar'})
self.assertEqual(rsp.status_code, 401, 'Ath')
def test_get_non_existing(self):
rsp = self.client.get('/dataset/foo')
self.assertEqual(404, rsp.status_code)
j = rsp.json()
self.assertTrue('message' in j, f"{j} should contain message")
self.assertTrue('foo' in j['message'], f"{j} should contain object id (foo)")
import unittest
from apiserver.security import User, JsonDBInterface, UserInDB
from apiserver.security import User, JsonDBInterface, UserInDB, authenticate_user, get_current_user
from apiserver.config import ApiserverSettings
from fastapi import HTTPException
from collections import namedtuple
import os
import pathlib
import shutil
import random
from unittest.mock import Mock, patch
class UserTests(unittest.TestCase):
......@@ -77,7 +79,22 @@ class UserTests(unittest.TestCase):
self.userdb.add(UserInDB(username=f"user_{n}", email='jo@go.com', hashed_password=f"{random.randint(0,200)}"))
self.assertEqual(len(self.userdb.list()), 25)
def test_not_authenticate_user(self):
mock = Mock(spec=JsonDBInterface)
mock.get.return_value = None
user = authenticate_user(userdb=mock, username='foo', password='pass')
self.assertIsNone(user)
mock.get.assert_called_with('foo')
def test_authenticate_user(self):
mock = Mock(spec=JsonDBInterface)
mock.get.return_value(UserInDB(username='foo', email='bar@o.w', hashed_password='passed'))
with patch('apiserver.security.user.verify_password') as vp:
user = authenticate_user(userdb=mock, username='foo', password='passed')
self.assertIsNotNone(user)
vp.assert_called_once()
mock.get.assert_called_once()
mock.get.assert_called_with('foo')
def test_current_user(self):
self.assertRaises(HTTPException, get_current_user, 'falsetoken', Mock(spec=JsonDBInterface))
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment