diff --git a/toardb/data/crud.py b/toardb/data/crud.py index 6afd7ffcf750e093ebdd8b9fd6cde49d662c3a2d..a2029248f467f95f59c37b4bd886843288803b12 100644 --- a/toardb/data/crud.py +++ b/toardb/data/crud.py @@ -5,15 +5,21 @@ Create, Read, Update, Delete functionality """ import sys +from sqlalchemy import insert, delete, select, and_ from sqlalchemy.orm import Session from sqlalchemy.engine import Engine +from geoalchemy2.elements import WKBElement, WKTElement from fastapi import File, UploadFile from fastapi.responses import JSONResponse from . import models, schemas from toardb.variables import models as variables_models from toardb.stationmeta import models as stationmeta_models from toardb.stationmeta.crud import get_stationmeta_core +from toardb.stationmeta.schemas import get_coordinates_from_geom, get_geom_from_coordinates +from toardb.timeseries.models import TimeseriesChangelog from toardb.timeseries.crud import get_timeseries_by_unique_constraints +from toardb.generic.models import CL_enum +from toardb.utils.utils import get_value_from_str, get_str_from_value import datetime as dt import pandas as pd from io import StringIO @@ -91,3 +97,102 @@ def create_data(db: Session, engine: Engine, input_handle: UploadFile = File(... status_code = 400 return JSONResponse(status_code=status_code, content=message) +def patch_data(db: Session, engine: Engine, description: str, version: str, input_handle: UploadFile = File(...)): + # a timeseries is defined by the unique_constraint of (station_id, variable_id, label) + # station_id: from header + # variable_id: from database (with variable_name -- from filename) + # label: unique information for *this* timeseries (station_id, variable_id) -- at the moment: contributor_shortname + # get variable_name from filename + + # versionlabel has to be unique for this timeseries ==> to be checked! + + variable_name = input_handle.filename.split('_')[0] + variable = db.query(variables_models.Variable).filter(variables_models.Variable.name == variable_name).first() + variable_id = variable.id + # get header information (station_id, contributor_shortname, timeshift_from_utc) + line = '#bla' + f = input_handle.file + prev = pos = 0 + while line[0] == '#': + line = f.readline().decode('utf-8') + key = line.split(':')[0].lower().strip() + if key == "#station_id": + station_id = line.split(':')[1] + if key == "#contributor_shortname": + label = line.split(':')[1] + if key == "#timeshift_from_utc": + timeoffset = dt.timedelta(hours=float(line.split(':')[1])) + prev, pos = pos, f.tell() + f.seek(prev) + station_code = station_id + stationmeta_core = get_stationmeta_core(db=db,station_code=station_code) + # there is a mismatch with coordinates + stationmeta_core.coordinates = get_geom_from_coordinates(stationmeta_core.coordinates) + station_id = stationmeta_core.id + timeseries = get_timeseries_by_unique_constraints(db=db,station_id=station_id,variable_id=variable_id,label=label) + if timeseries: + timeseries_id = timeseries.id + # open SpooledTemporaryFile, skip header (and also try to insert timeseries_id!) + df = pd.read_csv(input_handle.file, comment='#', header=None, sep=';',names=["time","value","flags"],parse_dates=["time"],index_col="time") + # substract timeshift to convert data to UTC + df.index = df.index - timeoffset + # now insert the timeseries_id to the end of the data frame + df.insert(2, 'timeseries_id', timeseries_id) + # also insert version + df.insert(3, 'version', version) + # datetime needs timezone information + df = df.tz_localize('UTC') + # determine period_start and period_end of data + period_start = min(df.index) + period_end = max(df.index) + # mv data from this period to data_archive + # the following command will not work, because only one record is intended to be inserted + # db.execute(insert(models.DataArchive).from_select([models.DataArchive], + # select([models.Data]).where( + # and_(and_(models.Data.timeseries_id == timeseries_id, + # models.Data.datetime >= period_start), + # models.Data.datetime <= period_end)))) + # debug: check, whether where statement is correctly executed + result = db.execute(select([models.Data]).where( + and_(and_(models.Data.timeseries_id == timeseries_id, + models.Data.datetime >= period_start), + models.Data.datetime <= period_end))) + rows = result.fetchall() + for row in rows: + db_obj = models.DataArchive(datetime=row[0], value=row[1], flags=row[2], version=row[3], timeseries_id=row[4]) + db.add(db_obj) + db.commit() + db.execute(delete(models.Data).where( + and_(and_(models.Data.timeseries_id == timeseries_id, + models.Data.datetime >= period_start), + models.Data.datetime <= period_end))) + db.commit() + # now insert new data for this period from file + buf = StringIO() + df.to_csv(buf, header=False) + buf.pos = 0 + buf.seek(0) + fake_conn = engine.raw_connection() + fake_cur = fake_conn.cursor() + try: + fake_cur.copy_from(buf, 'data', sep=',', columns=('datetime','value','flags','timeseries_id', 'version')) + fake_conn.commit() + message = 'Data successfully inserted.' + status_code = 200 + except: + e = sys.exc_info()[0] + message = 'An error occurred in data insertion: %s' % (e,) + status_code = 400 + # create changelog entry + # how to determine type_of_change? + # 4 – unspecified data value corrections (this holds also, if there is only one single value to be corrected; the addition "unspecified" keeps all possibilities open to add "specified" corrections later (e. g. from QC) + # 5 – replaced data with a new version + type_of_change = get_value_from_str(CL_enum,"UnspecifiedData") + db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=1, type_of_change=type_of_change, + old_value="", new_value="", period_start=period_start, period_end=period_end, version=version) + db.add(db_changelog) + db.commit() + else: + message = f'Timeseries not found for station {station_code.strip()}, variable {variable_name}, label {label.strip()}' + status_code = 400 + return JSONResponse(status_code=status_code, content=message) diff --git a/toardb/data/data.py b/toardb/data/data.py index f61b31f7eb225dbf675ee437f347146a1c5fbb29..ca8250f31c4006f3236d7d19076e667906d1f520 100644 --- a/toardb/data/data.py +++ b/toardb/data/data.py @@ -55,3 +55,8 @@ async def create_data(file: UploadFile = File(...), db: Session = Depends(get_db raise HTTPException(status_code=400, detail=msg) return response +@router.patch('/data/', response_model=schemas.DataCreate) +async def patch_data(description: str, version: str, file: UploadFile = File(...), db: Session = Depends(get_db), engine: Engine = Depends(get_engine)): + + response = crud.patch_data(db, engine, description=description, version=version, input_handle=file) + return response diff --git a/toardb/data/models.py b/toardb/data/models.py index f149ac8346b8c592d145fb3180823f16d08a118d..8b2ce7aa6b45d66863f0d32882b8b9a523336251 100644 --- a/toardb/data/models.py +++ b/toardb/data/models.py @@ -52,6 +52,46 @@ class Data(Base): # see: https://groups.google.com/forum/#!topic/sqlalchemy/YjGhE4d6K4U timeseries_id = Column(ForeignKey(Timeseries.id, deferrable=True, initially='DEFERRED'), nullable=False, index=True) +class DataArchive(Base): + """ Table "public.data_archive" + + +---------------+--------------------------+-----------+----------+---------+ + |Column |Type |Collation |Nullable |Default | + +===============+==========================+===========+==========+=========+ + | datetime | timestamp with time zone | | not null | | + +---------------+--------------------------+-----------+----------+---------+ + | value | double precision | | not null | | + +---------------+--------------------------+-----------+----------+---------+ + | flags | integer | | not null | | + +---------------+--------------------------+-----------+----------+---------+ + | version | character(28) | | not null | '000001.000000.00000000000000'::bpchar | + +---------------+--------------------------+-----------+----------+---------+ + | timeseries_id | integer | | not null | | + +---------------+--------------------------+-----------+----------+---------+ + + Indexes: + "data_archive_pkey" PRIMARY KEY, btree (timeseries_id, datetime) + "data_archive_datetime_idx" btree (datetime) + "data_archive_timeseries_id_idx" btree (timeseries_id) + "data_archive_value_idx" btree (value) + Check constraints: + "data_archive_flags_check" CHECK (flags >= 0) + Foreign-key constraints: + "data_archive_flags_fk_df_vocabulary_enum_val" FOREIGN KEY (flags) REFERENCES df_vocabulary(enum_val) + "data_archive_timeseries_id_fk_timeseries_id" FOREIGN KEY (timeseries_id) REFERENCES timeseries(id) DEFERRABLE INITIALLY DEFERRED + """ + + __tablename__ = 'data_archive' + __table_args__ = ( + PrimaryKeyConstraint('timeseries_id', 'datetime'), + ) + + datetime = Column(DateTime(True), nullable=False, index=True) + value = Column(Float(53), nullable=False, index=True) + flags = Column(ForeignKey('df_vocabulary.enum_val'), nullable=False) + version = Column(CHAR(28), nullable=False, server_default=text("'000001.000000.00000000000000'::bpchar")) + timeseries_id = Column(ForeignKey(Timeseries.id, deferrable=True, initially='DEFERRED'), nullable=False, index=True) + # controlled vocabulary # Data Access Rights