From 906653f5fd86633a47521ab4225517fa08672d4d Mon Sep 17 00:00:00 2001 From: schroeder5 <s.schroeder@fz-juelich.de> Date: Sat, 30 Jul 2022 09:28:17 +0000 Subject: [PATCH] #120: bug fix for limit/offset-issue --- toardb/stationmeta/crud.py | 102 +++---------------- toardb/timeseries/crud.py | 196 +++++-------------------------------- toardb/utils/utils.py | 93 +++++++++++++++++- 3 files changed, 129 insertions(+), 262 deletions(-) diff --git a/toardb/stationmeta/crud.py b/toardb/stationmeta/crud.py index f4266ea..734cb0c 100644 --- a/toardb/stationmeta/crud.py +++ b/toardb/stationmeta/crud.py @@ -14,21 +14,20 @@ from pydantic import ValidationError from geoalchemy2.shape import to_shape from geoalchemy2.types import Geometry from geoalchemy2.elements import WKBElement, WKTElement -from sqlalchemy import cast, Text, insert, update, delete, select, and_, String, func +from sqlalchemy import cast, Text, insert, update, delete, select, and_, String, text from sqlalchemy.orm import Session from sqlalchemy.engine import Engine from sqlalchemy.dialects.postgresql import JSONB, ARRAY -from sqlalchemy.sql.expression import func from sqlalchemy.inspection import inspect from fastapi import File, UploadFile from fastapi.responses import JSONResponse from . import models -from .models import StationmetaCore, StationmetaChangelog, stationmeta_core_stationmeta_roles_table, \ +from .models import StationmetaCore, StationmetaGlobal, StationmetaChangelog, stationmeta_core_stationmeta_roles_table, \ stationmeta_core_stationmeta_annotations_table from .schemas import get_coordinates_from_geom, get_geom_from_coordinates, get_abbreviation_from_code_description, \ StationmetaCreate, StationmetaPatch, Coordinates -from toardb.utils.utils import get_value_from_str, get_str_from_value, get_hr_value +from toardb.utils.utils import get_value_from_str, get_str_from_value, get_hr_value, create_filter from toardb.utils.settings import base_geodata_url import toardb @@ -79,92 +78,19 @@ def get_all_stationmeta_core(db: Session, limit: int, offset: int = 0): return db_objects -def get_all_stations_within_bounding_box (db:Session, bounding_box): - min_lat, min_lon, max_lat, max_lon = bounding_box.split(',') - bbox= f'SRID=4326;POLYGON (({min_lon} {min_lat}, {min_lon} {max_lat}, {max_lon} {max_lat}, {max_lon} {min_lat}, {min_lon} {min_lat}))' - return db.query(models.StationmetaCore).filter(StationmetaCore.coordinates.ST_Within(bbox)).all() - - # same method as above (is the above still needed?) def get_all_stationmeta(db: Session, path_params, query_params): - offset = 0 - limit = 10 - pagination_params = ["limit", "offset"] - len_query = len(query_params) - if "offset" in query_params: - offset= int(query_params["offset"]) - len_query -= 1 - if "limit" in query_params: - limit = int(query_params["limit"]) - len_query -= 1 - if not query_params or len_query == 0: - db_objects = db.query(models.StationmetaCore).order_by(models.StationmetaCore.id).offset(offset).limit(limit).all() - else: - # determine allowed query parameters - gis_params = ["bounding_box", "altitude_range"] - core_params = [column.name for column in inspect(models.StationmetaCore).c] + gis_params - global_params = [column.name for column in inspect(models.StationmetaGlobal).c if column.name not in ['id','station_id']] - allowed_params = core_params + global_params + pagination_params - set_db_objects = set(db.query(models.StationmetaCore).all()) - for param in query_params: - if param in allowed_params: - # special case for codes, which needs to be a list of lists - if param != "codes": - values = [v for v in query_params[param].split(',')] - else: - values = [ [ v ] for v in query_params[param].split(',') ] - if param in core_params: - #check for parameters of the controlled vocabulary - if param == "climatic_zone": - values = [get_value_from_str(toardb.toardb.CZ_vocabulary,v) for v in query_params[param].split(',')] - elif param == "coordinate_validation_status": - values = [get_value_from_str(toardb.toardb.CV_vocabulary,v) for v in query_params[param].split(',')] - elif param == "country": - values = [get_value_from_str(toardb.toardb.CN_vocabulary,v) for v in query_params[param].split(',')] - elif param == "type": - values = [get_value_from_str(toardb.toardb.ST_vocabulary,v) for v in query_params[param].split(',')] - elif param == "type_of_area": - values = [get_value_from_str(toardb.toardb.TA_vocabulary,v) for v in query_params[param].split(',')] - else: - pass - if param == "name": - ndb_objects = set() - for v in values: - search = "%{}%".format(v.lower()) - ndb_objects |= set(db.query(models.StationmetaCore).filter(func.lower(models.StationmetaCore.name).like(search)).all()) - elif param == "bounding_box": - # bounding_box information is given as: min_lat, min_lon, max_lat, max_lon - ndb_objects = set(get_all_stations_within_bounding_box(db, query_params[param])) - elif param == "altitude_range": - ndb_objects = set(db.query(models.StationmetaCore).filter(StationmetaCore.coordinates.ST_Z().between(values[0],values[1])).all()) - else: - ndb_objects = set(db.query(models.StationmetaCore).filter(getattr(models.StationmetaCore,param).in_(values)).all()) - else: - #check for parameters of the controlled vocabulary - if param == "climatic_zone_year2016": - values = [get_value_from_str(toardb.toardb.CZ_vocabulary,v) for v in query_params[param].split(',')] - elif param == "toar1_category": - values = [get_value_from_str(toardb.toardb.TC_vocabulary,v) for v in query_params[param].split(',')] - elif param == "htap_region_tier1_year2010": - values = [get_value_from_str(toardb.toardb.TR_vocabulary,v) for v in query_params[param].split(',')] - elif param == "dominant_landcover_year2012": - values = [get_value_from_str(toardb.toardb.LC_vocabulary,v) for v in query_params[param].split(',')] - elif param == "dominant_ecoregion_year2017": - values = [get_value_from_str(toardb.toardb.ER_vocabulary,v) for v in query_params[param].split(',')] - elif param in pagination_params: - continue - else: - pass - ndb_objects = set(db.query(models.StationmetaCore).filter(and_( \ - models.StationmetaCore.id == models.StationmetaGlobal.station_id, - getattr(models.StationmetaGlobal,param).in_(values))). \ - all()) - set_db_objects &= ndb_objects - else: #inform user, that an unknown parameter name was used (this could be a typo and falsify the result!) - status_code=400 - message=f"An unknown argument was received: {param}." - return JSONResponse(status_code=status_code, content=message) - db_objects = list(set_db_objects) + try: + limit, offset, t_filter, s_c_filter, s_g_filter = create_filter(query_params, "stationmeta") + except KeyError as e: + status_code=400 + return JSONResponse(status_code=status_code, content=str(e)) + + db_objects = db.query(models.StationmetaCore).filter(text(s_c_filter)). \ + join(StationmetaGlobal).filter(and_(StationmetaCore.id == StationmetaGlobal.station_id, text(s_g_filter))). \ + order_by(models.StationmetaCore.id). \ + limit(limit).offset(offset).all() + for db_object in db_objects: # there is a mismatch with coordinates and additional_metadata if isinstance(db_object.coordinates, (WKBElement, WKTElement)): diff --git a/toardb/timeseries/crud.py b/toardb/timeseries/crud.py index 7676adb..b9ab71a 100644 --- a/toardb/timeseries/crud.py +++ b/toardb/timeseries/crud.py @@ -6,9 +6,8 @@ Create, Read, Update, Delete functionality """ -from sqlalchemy import insert, and_, func +from sqlalchemy import insert, and_, func, text from sqlalchemy.orm import Session -from sqlalchemy.inspection import inspect from geoalchemy2.elements import WKBElement, WKTElement from fastapi.responses import JSONResponse from datetime import datetime @@ -23,7 +22,7 @@ from toardb.contacts.crud import get_organisation_by_name, get_contact from toardb.contacts.models import Organisation from toardb.variables.crud import get_variable from .schemas import TimeseriesCreate, TimeseriesPatch, TimeseriesRoleNoCreate -from toardb.utils.utils import get_value_from_str, get_str_from_value +from toardb.utils.utils import get_value_from_str, get_str_from_value, create_filter import toardb @@ -69,124 +68,19 @@ def get_citation(db: Session, timeseries_id: int): return acknowledgement, citation -# boundig_box argument used for timeseries... -def get_all_timeseries_from_stations_within_bounding_box (db:Session, bounding_box): - min_lat, min_lon, max_lat, max_lon = bounding_box.split(',') - bbox= f'SRID=4326;POLYGON (({min_lon} {min_lat}, {min_lon} {max_lat}, {max_lon} {max_lat}, {max_lon} {min_lat}, {min_lon} {min_lat}))' - return db.query(models.Timeseries).join(StationmetaCore). \ - filter(StationmetaCore.coordinates.ST_Within(bbox)). \ - filter(models.Timeseries.station_id == StationmetaCore.id). \ - order_by(models.Timeseries.id).all() - - -#first search method def search_all(db, path_params, query_params): - offset = 0 - limit = 10 - pagination_params = ["limit", "offset"] - len_query = len(query_params) - if "offset" in query_params: - offset= int(query_params["offset"]) - len_query -= 1 - if "limit" in query_params: - limit = int(query_params["limit"]) - len_query -= 1 - if not query_params or len_query == 0: - db_objects = db.query(models.Timeseries).order_by(models.Timeseries.id).offset(offset).limit(limit).all() - else: - # determine allowed query parameters (first *only* from Timeseries) - timeseries_params = [column.name for column in inspect(models.Timeseries).c] - gis_params = ["bounding_box", "altitude_range"] - core_params = [column.name for column in inspect(StationmetaCore).c] - global_params = [column.name for column in inspect(StationmetaGlobal).c if column.name not in ['id','station_id']] - # determine allowed query parameters - allowed_params = timeseries_params + core_params + global_params + pagination_params + gis_params - set_db_objects = set(db.query(models.Timeseries).all()) - for param in query_params: - if param in allowed_params: - # special case for codes, which needs to be a list of lists - if param != "codes": - values = [v for v in query_params[param].split(',')] - else: - values = [ [ v ] for v in query_params[param].split(',') ] - if param in core_params or param in timeseries_params: - #check for parameters of the controlled vocabulary - if param == "climatic_zone": - values = [get_value_from_str(toardb.toardb.CZ_vocabulary,v) for v in query_params[param].split(',')] - elif param == "coordinate_validation_status": - values = [get_value_from_str(toardb.toardb.CV_vocabulary,v) for v in query_params[param].split(',')] - elif param == "country": - values = [get_value_from_str(toardb.toardb.CN_vocabulary,v) for v in query_params[param].split(',')] - elif param == "type": - values = [get_value_from_str(toardb.toardb.ST_vocabulary,v) for v in query_params[param].split(',')] - elif param == "type_of_area": - values = [get_value_from_str(toardb.toardb.TA_vocabulary,v) for v in query_params[param].split(',')] - elif param == "climatic_zone_year2016": - values = [get_value_from_str(toardb.toardb.CZ_vocabulary,v) for v in query_params[param].split(',')] - elif param == "toar1_category": - values = [get_value_from_str(toardb.toardb.TC_vocabulary,v) for v in query_params[param].split(',')] - elif param == "htap_region_tier1_year2010": - values = [get_value_from_str(toardb.toardb.TR_vocabulary,v) for v in query_params[param].split(',')] - elif param == "dominant_landcover_year2012": - values = [get_value_from_str(toardb.toardb.LC_vocabulary,v) for v in query_params[param].split(',')] - elif param == "dominant_ecoregion_year2017": - values = [get_value_from_str(toardb.toardb.ER_vocabulary,v) for v in query_params[param].split(',')] - elif param == "sampling_frequency": - values = [get_value_from_str(toardb.toardb.SF_vocabulary,v) for v in query_params[param].split(',')] - elif param == "aggregation": - values = [get_value_from_str(toardb.toardb.AT_vocabulary,v) for v in query_params[param].split(',')] - elif param == "data_origin_type": - values = [get_value_from_str(toardb.toardb.OT_vocabulary,v) for v in query_params[param].split(',')] - elif param == "data_origin": - values = [get_value_from_str(toardb.toardb.DO_vocabulary,v) for v in query_params[param].split(',')] - elif param in pagination_params: - continue - else: - pass - else: - pass - - if param in timeseries_params: - ndb_objects = set(db.query(models.Timeseries).join(StationmetaCore). \ - filter(getattr(models.Timeseries,param).in_(values)). \ - filter(models.Timeseries.station_id == StationmetaCore.id). \ - order_by(models.Timeseries.id).all()) - elif param == "bounding_box": - # bounding_box information is given as: min_lat, min_lon, max_lat, max_lon - ndb_objects = set(get_all_timeseries_from_stations_within_bounding_box(db, query_params[param])) - elif param == "altitude_range": - ndb_objects = set(db.query(models.Timeseries).join(StationmetaCore). \ - filter(StationmetaCore.coordinates.ST_Z().between(values[0],values[1])). \ - filter(models.Timeseries.station_id == StationmetaCore.id). \ - order_by(models.Timeseries.id).all()) - elif param in core_params: - if param == "name": - ndb_objects = set() - for v in values: - search = "%{}%".format(v.lower()) - ndb_objects |= set(db.query(models.Timeseries).join(StationmetaCore). \ - filter(func.lower(StationmetaCore.name).like(search)). \ - filter(models.Timeseries.station_id == StationmetaCore.id). \ - order_by(models.Timeseries.id).all()) - else: - ndb_objects = set(db.query(models.Timeseries).join(StationmetaCore). \ - filter(getattr(StationmetaCore,param).in_(values)). \ - filter(models.Timeseries.station_id == StationmetaCore.id). \ - order_by(models.Timeseries.id).all()) - elif param in pagination_params: - pass - else: - ndb_objects = set(db.query(models.Timeseries).join(StationmetaCore). \ - filter(and_(StationmetaCore.id == StationmetaGlobal.station_id, \ - getattr(StationmetaGlobal,param).in_(values))). \ - filter(models.Timeseries.station_id == StationmetaCore.id). \ - order_by(models.Timeseries.id).all()) - set_db_objects &= ndb_objects - else: #inform user, that an unknown parameter name was used (this could be a typo and falsify the result!) - status_code=400 - message=f"An unknown argument was received: {param}." - return JSONResponse(status_code=status_code, content=message) - db_objects = list(set_db_objects) + try: + limit, offset, t_filter, s_c_filter, s_g_filter = create_filter(query_params, "search") + except KeyError as e: + status_code=400 + return JSONResponse(status_code=status_code, content=str(e)) + + db_objects = db.query(models.Timeseries).filter(text(t_filter)). \ + join(StationmetaCore).filter(and_(models.Timeseries.station_id == StationmetaCore.id, text(s_c_filter))). \ + join(StationmetaGlobal).filter(and_(StationmetaCore.id == StationmetaGlobal.station_id, text(s_g_filter))). \ + order_by(models.Timeseries.id). \ + limit(limit).offset(offset).all() + for db_object in db_objects: # there is a mismatch with additional_metadata db_object.additional_metadata = str(db_object.additional_metadata).replace("'",'"') @@ -194,62 +88,18 @@ def search_all(db, path_params, query_params): if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)): db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates) db_object.station.additional_metadata = str(db_object.station.additional_metadata).replace("'",'"') - return sorted(db_objects, key=lambda o: o.id)[offset:offset+limit] + return db_objects #def get_all_timeseries(db: Session, limit: int, offset: int, station_code: str): def get_all_timeseries(db, path_params, query_params): - offset = 0 - limit = 10 - pagination_params = ["limit", "offset"] - len_query = len(query_params) - if "offset" in query_params: - offset= int(query_params["offset"]) - len_query -= 1 - if "limit" in query_params: - limit = int(query_params["limit"]) - len_query -= 1 - if not query_params or len_query == 0: - db_objects = db.query(models.Timeseries).order_by(models.Timeseries.id).offset(offset).limit(limit).all() - else: - # determine allowed query parameters (first *only* from Timeseries) - timeseries_columns = [column.name for column in inspect(models.Timeseries).c] - # query parameter already supports station_code - allowed_params = timeseries_columns + pagination_params + ["station_code"] - set_db_objects = set(db.query(models.Timeseries).all()) - for param in query_params: - if param in allowed_params: - values = [v for v in query_params[param].split(',')] - if param == "station_code": - station_ids = [] - for station_code in values: - stationmeta_core = get_stationmeta_core(db=db,station_code=station_code) - station_id = stationmeta_core.id - station_ids.append(station_id) - ndb_objects = set(db.query(models.Timeseries).filter(models.Timeseries.station_id.in_(station_ids)). \ - order_by(models.Timeseries.id).all()) - else: - #check for parameters of the controlled vocabulary - if param == "sampling_frequency": - values = [get_value_from_str(toardb.toardb.SF_vocabulary,v) for v in query_params[param].split(',')] - elif param == "aggregation": - values = [get_value_from_str(toardb.toardb.AT_vocabulary,v) for v in query_params[param].split(',')] - elif param == "data_origin_type": - values = [get_value_from_str(toardb.toardb.OT_vocabulary,v) for v in query_params[param].split(',')] - elif param == "data_origin": - values = [get_value_from_str(toardb.toardb.DO_vocabulary,v) for v in query_params[param].split(',')] - elif param in pagination_params: - continue - else: - pass - ndb_objects = set(db.query(models.Timeseries).filter(getattr(models.Timeseries,param).in_(values)).\ - order_by(models.Timeseries.id).all()) - set_db_objects &= ndb_objects - else: #inform user, that an unknown parameter name was used (this could be a typo and falsify the result!) - status_code=400 - message=f"An unknown argument was received: {param}." - return JSONResponse(status_code=status_code, content=message) - db_objects = list(set_db_objects) + try: + limit, offset, t_filter, s_c_filter, s_g_filter = create_filter(query_params, "timeseries") + except KeyError as e: + status_code=400 + return JSONResponse(status_code=status_code, content=str(e)) + db_objects = db.query(models.Timeseries).filter(text(t_filter)).order_by(models.Timeseries.id). \ + limit(limit).offset(offset).all() for db_object in db_objects: # there is a mismatch with additional_metadata db_object.additional_metadata = str(db_object.additional_metadata).replace("'",'"') @@ -257,7 +107,7 @@ def get_all_timeseries(db, path_params, query_params): if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)): db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates) db_object.station.additional_metadata = str(db_object.station.additional_metadata).replace("'",'"') - return sorted(db_objects, key=lambda o: o.id)[offset:offset+limit] + return db_objects def get_timeseries_by_unique_constraints(db: Session, station_id: int, variable_id: int, resource_provider: str = None, diff --git a/toardb/utils/utils.py b/toardb/utils/utils.py index 26f7048..bc0a5a4 100644 --- a/toardb/utils/utils.py +++ b/toardb/utils/utils.py @@ -7,10 +7,13 @@ Helper functions for TOAR database """ from sqlalchemy import Table from sqlalchemy.orm import Session +from sqlalchemy.inspection import inspect from collections import namedtuple import requests from toardb.utils.settings import base_geodata_url +from toardb.timeseries.models import Timeseries +from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal import toardb # function to return code for given value @@ -40,6 +43,95 @@ def get_hr_value(table_str,field,value): value = get_str_from_value(vocabulary,int(value)) return value +# +def create_filter(query_params, endpoint): + + # determine allowed query parameters (first *only* from Timeseries) + timeseries_params = {column.name for column in inspect(Timeseries).c} | {'station_code'} + gis_params = {"bounding_box", "altitude_range"} + core_params = {column.name for column in inspect(StationmetaCore).c} + global_params = {column.name for column in inspect(StationmetaGlobal).c if column.name not in ['id','station_id']} + + # pagination + offset= int(query_params.get("offset", 0)) + limit = int(query_params.get("limit", 10)) + + allowed_params = {"limit", "offset"} + if endpoint in {'stationmeta'}: + allowed_params |= gis_params | core_params | global_params + elif endpoint in {'timeseries'}: + allowed_params |= timeseries_params + elif endpoint in {'search'}: + allowed_params |= gis_params | core_params | global_params | timeseries_params + else: + raise ValueError(f"Wrong endpoint given: {endpoint}") + + t_filter = [] + s_c_filter = [] + s_g_filter = [] + # query_params is a multi-dict! + for param in query_params: + if param not in allowed_params: #inform user, that an unknown parameter name was used (this could be a typo and falsify the result!) + raise KeyError(f"An unknown argument was received: {param}.") + if param in {"limit", "offset"}: + continue + values = [item for v in query_params.getlist(param) for item in v.split(',')] + if param in timeseries_params: + #check for parameters of the controlled vocabulary + if param == "sampling_frequency": + values = [get_value_from_str(toardb.toardb.SF_vocabulary,v) for v in values] + elif param == "aggregation": + values = [get_value_from_str(toardb.toardb.AT_vocabulary,v) for v in values] + elif param == "data_origin_type": + values = [get_value_from_str(toardb.toardb.OT_vocabulary,v) for v in values] + elif param == "data_origin": + values = [get_value_from_str(toardb.toardb.DO_vocabulary,v) for v in values] + t_filter.append(f"timeseries.{param} IN {values}") + elif param in core_params: + #check for parameters of the controlled vocabulary + if param == "timezone": + values = [get_value_from_str(toardb.toardb.TZ_vocabulary,v) for v in values] + elif param == "coordinate_validation_status": + values = [get_value_from_str(toardb.toardb.CV_vocabulary,v) for v in values] + elif param == "country": + values = [get_value_from_str(toardb.toardb.CN_vocabulary,v) for v in values] + elif param == "type": + values = [get_value_from_str(toardb.toardb.ST_vocabulary,v) for v in values] + elif param == "type_of_area": + values = [get_value_from_str(toardb.toardb.TA_vocabulary,v) for v in values] + # exceptions for special fields (codes, name) + if param == 'codes': + values = ", ".join([f"'{{{v}}}'" for v in values]) + s_c_filter.append(f"stationmeta_core.codes IN ({values})") + elif param == 'name': + s_c_filter.append(f"LOWER(stationmeta_core.name) LIKE '%{values[0].lower()}%'") + else: + s_c_filter.append(f"stationmeta_core.{param} IN {values}") + elif param in global_params: + if param == "climatic_zone_year2016": + values = [get_value_from_str(toardb.toardb.CZ_vocabulary,v) for v in values] + elif param == "toar1_category": + values = [get_value_from_str(toardb.toardb.TC_vocabulary,v) for v in values] + elif param == "htap_region_tier1_year2010": + values = [get_value_from_str(toardb.toardb.TR_vocabulary,v) for v in values] + elif param == "dominant_landcover_year2012": + values = [get_value_from_str(toardb.toardb.LC_vocabulary,v) for v in values] + elif param == "dominant_ecoregion_year2017": + values = [get_value_from_str(toardb.toardb.ER_vocabulary,v) for v in values] + s_g_filter.append(f"stationmeta_global.{param} IN {values}") + elif param in gis_params: + if param == "bounding_box": + min_lat, min_lon, max_lat, max_lon = values + bbox= f'SRID=4326;POLYGON (({min_lon} {min_lat}, {min_lon} {max_lat}, {max_lon} {max_lat}, {max_lon} {min_lat}, {min_lon} {min_lat}))' + s_c_filter.append(f"ST_CONTAINS(ST_GeomFromEWKT({bbox}, coordinates)") + else: + s_c_filter.append(f"ST_Z(coordinates) BETWEEN {values[0]} AND {values[1]}") + + t_filter = " AND ".join(t_filter).replace('[','(').replace(']',')') + s_c_filter = " AND ".join(s_c_filter).replace('[','(').replace(']',')') + s_g_filter = " AND ".join(s_g_filter).replace('[','(').replace(']',')') + return limit, offset, t_filter, s_c_filter, s_g_filter + ### # Rasdaman does not run stable! @@ -47,7 +139,6 @@ def get_hr_value(table_str,field,value): # # only activate the following lines if you want to update pages! ### - # also get provenance information geodata_services = [ 'topography_srtm', 'ecoregion', 'stable_nightlights', 'climatic_zone', 'nox_emissions', 'landcover', -- GitLab