diff --git a/toardb/data/crud.py b/toardb/data/crud.py index ccfebf844061bb17eb268bcab59766d8264b982c..af2404f58cbf3966a9bf8f02b2ae4a4eb305b09c 100644 --- a/toardb/data/crud.py +++ b/toardb/data/crud.py @@ -22,6 +22,7 @@ from toardb.timeseries.models import TimeseriesChangelog from toardb.timeseries.crud import get_timeseries_by_unique_constraints from toardb.generic.models import CL_enum from toardb.utils.utils import get_value_from_str, get_str_from_value +from autoqc.autoqc.autoqc_runner import run_autoqc import datetime as dt import pandas as pd from io import StringIO @@ -75,6 +76,57 @@ def create_data_record(db: Session, engine: Engine, return JSONResponse(status_code=status_code, content=message) +def insert_dataframe (db: Session, engine: Engine, df: pd.DataFrame): + # df: pandas.DataFrame + # index: datetime + # 1st column: value + # 2nd column: flags + # 3rd column: timeseries_id + # 4th column: version + # test_config taken from autoqc/demo_program.py: + test_config = [{'range_test': {'low_thres': -5, 'high_thres': 998}}, + {'sigma_test': {'sigma': 3}}, + {'constant_value_test': {'first_thres': 5, + 'second_thres': 10}, + 'positive_spike_test': {'window': 100, 'sigma': 3}, + 'negative_spike_test': {'window': 100, 'sigma': 3}}] + # get flags from autoqc + # autoqc expects pandas.Series + # --> if getting pandas.DataFrame, it will take index and first column + # (other columns will be ignored) + flags = run_autoqc(test_config, df, ok_limit=0.85, questionable_limit=0.6) + # get flags related to given datetimes (no non values) + flags.name = 'autoqc_flag' + # datetime needs timezone information + flags = flags.tz_localize('UTC') + # combine autoqc flags with flags given by provider + # !!!!TBD!!!! + # for now: + flags = flags.where(flags == ' OK ','OKValidatedQCPassed') + # exchange combined flags in dataframe + df = df.join(flags) + flag_num = [get_value_from_str(DF_enum,flag.strip()) for flag in df['autoqc_flag']] + del df['flags'] + del df['autoqc_flag'] + df.insert(1, 'flags', flag_num) + buf = StringIO() + df.to_csv(buf, header=False) + buf.pos = 0 + buf.seek(0) + fake_conn = engine.raw_connection() + fake_cur = fake_conn.cursor() + try: + fake_cur.copy_from(buf, 'data', sep=',', columns=('datetime','value','flags','timeseries_id', 'version')) + fake_conn.commit() + message = 'Data successfully inserted.' + status_code = 200 + except: + e = sys.exc_info()[0] + message = 'An error occurred in data insertion: %s' % (e,) + status_code = 400 + return JSONResponse(status_code=status_code, content=message) + + def create_data(db: Session, engine: Engine, input_handle: UploadFile = File(...)): # a timeseries is defined by the unique_constraint of (station_id, variable_id, label, resource_provider) # station_id: from header @@ -117,47 +169,21 @@ def create_data(db: Session, engine: Engine, input_handle: UploadFile = File(... df.insert(3, 'version', version) # datetime needs timezone information df = df.tz_localize('UTC') - buf = StringIO() - df.to_csv(buf, header=False) - buf.pos = 0 - buf.seek(0) - fake_conn = engine.raw_connection() - fake_cur = fake_conn.cursor() - try: - fake_cur.copy_from(buf, 'data', sep=',', columns=('datetime','value','flags','timeseries_id', 'version')) - fake_conn.commit() - message = 'Data successfully inserted.' - status_code = 200 - except: - e = sys.exc_info()[0] - message = 'An error occurred in data insertion: %s' % (e,) - status_code = 400 + insert_result = insert_dataframe (db, engine, df) else: message = f'Timeseries not found for station {station_code.strip()}, variable {variable_name}, label {label.strip()}' status_code = 400 - return JSONResponse(status_code=status_code, content=message) + insert_result = JSONResponse(status_code=status_code, content=message) + return insert_result + def create_bulk_data(db: Session, engine: Engine, bulk: List[schemas.DataCreate]): df = pd.DataFrame([x.dict() for x in bulk]).set_index("datetime") flag_num = [get_value_from_str(DF_enum,flag) for flag in df['flags']] del df['flags'] df.insert(1, 'flags', flag_num) - buf = StringIO() - df.to_csv(buf, header=False) - buf.pos = 0 - buf.seek(0) - fake_conn = engine.raw_connection() - fake_cur = fake_conn.cursor() - try: - fake_cur.copy_from(buf, 'data', sep=',', columns=('datetime','value','flags','timeseries_id', 'version')) - fake_conn.commit() - message = 'Data successfully inserted.' - status_code = 200 - except: - e = sys.exc_info()[0] - message = 'An error occurred in data insertion: %s' % (e,) - status_code = 400 - return JSONResponse(status_code=status_code, content=message) + insert_result = insert_dataframe (db, engine, df) + return insert_result def patch_data(db: Session, engine: Engine, description: str, version: str, input_handle: UploadFile = File(...)): # a timeseries is defined by the unique_constraint of (station_id, variable_id, label)