Skip to content
Snippets Groups Projects
Select Git revision
  • 0e72aba64361e940af04668889da4059e64490f4
  • master default protected
  • enxhi_issue460_remove_TOAR-I_access
  • michael_issue459_preprocess_german_stations
  • sh_pollutants
  • develop protected
  • release_v2.4.0
  • michael_issue450_feat_load-ifs-data
  • lukas_issue457_feat_set-config-paths-as-parameter
  • lukas_issue454_feat_use-toar-statistics-api-v2
  • lukas_issue453_refac_advanced-retry-strategy
  • lukas_issue452_bug_update-proj-version
  • lukas_issue449_refac_load-era5-data-from-toar-db
  • lukas_issue451_feat_robust-apriori-estimate-for-short-timeseries
  • lukas_issue448_feat_load-model-from-path
  • lukas_issue447_feat_store-and-load-local-clim-apriori-data
  • lukas_issue445_feat_data-insight-plot-monthly-distribution
  • lukas_issue442_feat_bias-free-evaluation
  • lukas_issue444_feat_choose-interp-method-cams
  • 414-include-crps-analysis-and-other-ens-verif-methods-or-plots
  • lukas_issue384_feat_aqw-data-handler
  • v2.4.0 protected
  • v2.3.0 protected
  • v2.2.0 protected
  • v2.1.0 protected
  • Kleinert_etal_2022_initial_submission
  • v2.0.0 protected
  • v1.5.0 protected
  • v1.4.0 protected
  • v1.3.0 protected
  • v1.2.1 protected
  • v1.2.0 protected
  • v1.1.0 protected
  • IntelliO3-ts-v1.0_R1-submit
  • v1.0.0 protected
  • v0.12.2 protected
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.0 protected
  • v0.10.0 protected
  • IntelliO3-ts-v1.0_initial-submit
41 results

test_helpers.py

Blame
  • userdb-cli.py 4.47 KiB
    #! /usr/bin/python3
    import os, json, argparse, abc
    
    from pydantic import BaseModel
    from typing import List
    from passlib.context import CryptContext
    
    
    
    class User(BaseModel):
        username: str
        email: str = None
    
    
    class UserInDB(User):
        hashed_password: str = None
    
    
    class AbstractDBInterface(metaclass=abc.ABCMeta):
        @abc.abstractclassmethod
        def list(self) -> List:
            raise NotImplementedError()
    
        @abc.abstractclassmethod
        def get(self, username: str):
            raise NotImplementedError()
    
        @abc.abstractclassmethod
        def add(self, user: UserInDB):
            raise NotImplementedError()
    
        @abc.abstractclassmethod
        def delete(self, username: str):
            raise NotImplementedError()
    
    
    class JsonDBInterface(AbstractDBInterface):
    
        def __init__(self, filepath):
            self.file_path = filepath
            if not (os.path.exists(self.file_path) and os.path.isfile(self.file_path)):
                # create empty json
                self.__save_all({})
            else:
                # if it exists, check if it is valid
                _ = self.__read_all()
    
        def __read_all(self):
            with open(self.file_path, 'r') as f:
                return json.load(f)
    
        def __save_all(self, data):
            with open(self.file_path, 'w') as f:
                json.dump(data, f)
    
        def list(self):
            data = self.__read_all()
            return list(data.keys())
    
        def get(self, username: str):
            data = self.__read_all()
            if username not in data:
                return None
    
            return UserInDB(**data[username])
    
        def add(self, user: UserInDB):
            data = self.__read_all()
            if user.username in data:
                raise Exception(f"User {user.username} already exists!")
    
            data[user.username] = user.dict()
            self.__save_all(data=data)
    
        def delete(self, username: str):
            data = self.__read_all()
            # idempotent? or return?
            _ = data.pop(username, None)
    
            self.__save_all(data)
    
    
    __pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    def get_password_hash(password):
        return __pwd_context.hash(password)
    
    def main(args):
        userdb = JsonDBInterface(args.userdb_path)
    
        if 'hash' in args.operation:
            if not args.password:
                raise ValueError("Password is not set!")
            print(get_password_hash(args.password))
        elif 'ls' in args.operation:
            for user in userdb.list():
                print(user)
        elif 'add' in args.operation:
            if not args.username:
                raise ValueError("Username is not set!")
            if not args.mail:
                raise ValueError("Mail is not set!")
            hash = args.bcrypt_hash
            if not args.bcrypt_hash:
                if  not args.password:
                    raise ValueError("No Password or hash given!")
                hash = get_password_hash(args.password)
            
            user = UserInDB(username=args.username, email=args.mail, hashed_password=hash)
            userdb.add(user)
            print("new User added:")
            print(user)
        elif 'show' in args.operation:
            if not args.username:
                raise ValueError("Username is not set!")
            user = userdb.get(args.username)
            print(user)
        elif 'rm' in args.operation:
            if not args.username:
                raise ValueError("Username is not set!")
            user = userdb.get(args.username)
            print("Deleting the following user:")
            print(user)
            userdb.delete(args.username)
    
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser("userdb-cli.py", description="CLI for a userdb.json for the datacatalog-apiserver.", formatter_class=argparse.RawTextHelpFormatter)
        parser.add_argument("operation", type=str, nargs=1, help="\
    hash \tReturn a bcrypt hash for the given password. Requires -p. \n\
    ls \tLists all Users in the userdb. \n\
    add \tAdds a new user to the userdb. Requires -u, -m and either -p or -b. \n\
    show \tShows a single user from the userdb. Requires -u. \n\
    rm \tDeletes a single user from the userdb. Requires -u. \
    ")
        parser.add_argument("-u", "--username", help="The username that should be modified")
        parser.add_argument("-m", "--mail", help="The email of a newly created user.")
        parser.add_argument("-p", "--password", help="The password of a newly created user.")
        parser.add_argument("-b", "--bcrypt-hash", help="The bcrypt password-hash of a newly created user.")
        parser.add_argument("userdb_path", type=str, nargs='?', help="The path to the userdb to be modified or created.", default="./userdb.json")
        args = parser.parse_args()
        main(args)