diff --git a/harvesting/get_UBA_NRT_data_withAutoInsert.py b/harvesting/get_UBA_NRT_data_withAutoInsert.py new file mode 100644 index 0000000000000000000000000000000000000000..e2ba1674509c43da1a8584f8c076dcf5b43b9b10 --- /dev/null +++ b/harvesting/get_UBA_NRT_data_withAutoInsert.py @@ -0,0 +1,631 @@ +# -*- coding: utf-8 -*- +# +# ignore warnings about insecure SSL: +# python -W ignore get_UBA_data_withAutoInsert.py +# +#!/usr/bin/python +import os +import urllib +import psycopg2 +import csv +import re +import datetime as dt +import sys +import operator +import requests + +dir_name = "/home/s.schroeder/UBA_NRT" +TOAR_SERVICE_URL = 'https://join-dev.fz-juelich.de:8443/' +insecure_ssl=False + +url_link_stationsparameter = "wget --user Luftdaten --password 2jNmbXCjbh7xyCJqsNPaKMd4d http://www.luftdaten.umweltbundesamt.de/files/StationparameterMeta.csv >/dev/null 2>&1" +url_link_stations = "wget --user Luftdaten --password 2jNmbXCjbh7xyCJqsNPaKMd4d http://www.luftdaten.umweltbundesamt.de/files/StationMeta.csv >/dev/null 2>&1" + +#Links with history: +#url_link_stationsparameter_withHistory = "wget http://Luftdaten:2jNmbXCjbh7xyCJqsNPaKMd4d@www.luftdaten.umweltbundesamt.de/files/StationparameterMeta_with_history.csv >/dev/null 2>&1" +#url_link_stations_withHistory = "wget http://Luftdaten:2jNmbXCjbh7xyCJqsNPaKMd4d@www.luftdaten.umweltbundesamt.de/files/StationMeta_with_history.csv >/dev/null 2>&1" + +#species list +PARAMETERS_DICT = { + 'Schwefeldioxid' :'so2', + 'Ozon':'o3', + 'Stickstoffdioxid':'no2', + 'Stickstoffmonoxid':'no', + 'Kohlenmonoxid':'co', + 'Temperatur' :'temp', + 'Windgeschwindigkeit' :'wspeed', + 'Windrichtung' :'wdir', + 'PM10' :'pm10', + 'PM2_5':'pm2p5', + 'Relative Feuchte':'relhum', + 'Benzol': 'benzene', + 'Ethan': 'ethane', + 'Methan': 'ch4', + 'Propan': 'propane', + 'Toluol': 'toluene', + 'o-Xylol': 'oxylene', + 'm,p-Xylol': 'mpxylene', + 'Luftdruck': 'press' +} + +PARAMETERS_DICT_en = { + 'Sulfur dioxide' :'so2', + 'Ozone':'o3', + 'Nitrogen dioxide first measurement':'no2', + 'Nitrogen monoxide':'no', + 'Carbon monoxide':'co', + 'Temperature' :'temp', + 'Wind velocity' :'wspeed', + 'Wind direction' :'wdir', + 'Particulate matter - PM10, first measurement':'pm10', + 'Particulate matter - PM2,5, first measurement':'pm2p5', + 'Relative humidity':'relhum', + 'Benzene': 'benzene', + 'Ethane': 'ethane', + 'Methane': 'ch4', + 'Propane': 'propane', + 'Toluene': 'toluene', + 'o-Xylene': 'oxylene', + 'm,p-Xylene': 'mpxylene', + 'Atmospheric pressure': 'press' +} + +TYPES_DICT = { + 'Hintergrund': 'background', + 'Industrie': 'industrial', + 'Verkehr': 'traffic' +} + +TYPES_OF_AREA_DICT = { + u'l\xe4ndlich abgelegen': 'rural', + u'l\xe4ndliches Gebiet': 'rural', + u'l\xe4ndlich regional': 'rural', + u'l\xe4ndlich stadtnah': 'rural', + u'st\xe4dtisches Gebiet': 'urban', + u'vorst\xe4dtisches Gebiet': 'suburban' +} + + +UBA_PARAMETER_UNITS = { "co": "mg m-3", + "no": "ug m-3", + "no2": "ug m-3", + "o3": "ug m-3", + "so2": "ug m-3", + "benzene": "ug m-3", + "ethane": "ug m-3", + "ch4": "ug m-3", + "propane": "ug m-3", + "toluene": "ug m-3", + "oxylene": "ug m-3", + "mpxylene": "ug m-3", + "pm1": "ug m-3", + "pm10": "ug m-3", + "pm2p5": "ug m-3", + "press": "hPa", + "temp": "degree celsius", + "wdir": "degree", + "wspeed": "m s-1", + "relhum": "%", + } +UBA_UNIT_CONVERSION = { "co": [operator.mul, 858.95], + "no": [operator.mul, 0.80182], + "no2": [operator.mul, 0.52297], + "o3": [operator.mul, 0.50124], + "so2": [operator.mul, 0.37555], + "benzene": [operator.mul, 0.30802 ], + "ethane": [operator.mul, 0.77698 ], + "ch4": [operator.mul, 1.49973 ], + "propane": [operator.mul, 0.52982 ], + "toluene": [operator.mul, 0.26113 ], + "oxylene": [operator.mul, 0.22662 ], + "mpxylene": [operator.mul, 0.22662 ], + } + + + +"""PARAMETER_THRESHOLDS = { "parameter": {"threshold1": [min, max], "threshold2": [min, max]}} + Add values in database only when it satisfies threshold criteria in below sequence: + 1) Remove values which are <threshold1[min] and >threshold1[max] + 2) If values are <threshold2[min] and >threshold2[max] flag them as doutful(2) else flag them as OK(0) + 3) Add it in database +""" +PARAMETER_THRESHOLDS ={ "co": {"threshold1" : [-1., 5000.], "threshold2": [-1., 1000.]}, + "relhum": {"threshold1" : [0., 110.], "threshold2": [0., 105.]}, + "no": {"threshold1" : [-100., 5000.], "threshold2": [-1., 500.]}, + "no2": {"threshold1" : [-100., 5000.], "threshold2": [-1., 500.]}, + "o3": {"threshold1" : [-80., 500.], "threshold2": [-1., 300.]}, + "pm10": {"threshold1" : [-80., 10000.], "threshold2": [-1., 2000.]}, + "pm2p5": {"threshold1" : [-10., 1000.], "threshold2": [-1., 600.]}, + "press": {"threshold1" : [1., 1100.], "threshold2": [700., 1060.]}, + "so2": {"threshold1" : [-80., 4000.], "threshold2": [-1., 1000.]}, + "temp": {"threshold1" : [-40., 50.], "threshold2": [-32., 45.]}, + "wdir": {"threshold1" : [-360., 360.], "threshold2": [0., 360.]}, + "wspeed": {"threshold1" : [0., 100.], "threshold2": [0., 100.]}, + } + +# Define states and contributors for German measurement stations +dstates = {} +dstates["BE"] = ("Berlin", "N/A", "Senatsverwaltung für Stadtentwicklung und Umwelt") +dstates["BW"] = ("Baden-Württemberg", "LUBW", + "Landesanstalt für Umwelt, Messungen und Naturschutz Baden-Württemberg") +dstates["BY"] = ("Bayern", "LFU", "Bayerisches Landesamt für Umwelt") +dstates["BB"] = ("Berlin-Brandenburg", "N/A", "Senatsverwaltung für Stadtentwicklung und Umwelt Berlin; " + \ + "Landesamt für Umwelt, Gesundheit und Verbraucherschutz (LUGV) Brandenburg") +dstates["HB"] = ("Hansestadt Bremen", "N/A", "Senat für Umwelt, Bau und Verkehr, Bremen") +dstates["HH"] = ("Hamburg", "N/A", "Institut für Hygiene und Umwelt Hamburg") +dstates["HE"] = ("Hessen", "HLUG", "Hessisches Landesamt für Umwelt und Geologie") +dstates["MV"] = ("Mecklenburg-Vorpommern", "LUNG", "Landesamt für Umwelt, Naturschutz und Geologie") +dstates["NI"] = ("Niedersachsen", "N/A", "Niedersächsiches Ministerium für Umwelt, Energie und Klimaschutz") +dstates["NW"] = ("Nordrhein-Westfalen", "LANUV", "Landesamt für Natur, Umwelt und Verbraucherschutz " + \ + "Nordrhein-Westfalen") +dstates["RP"] = ("Rheinland-Pfalz", "N/A", "Landesamt für Umwelt, Wasserwirtschaft und Gewerbeaufsicht") +dstates["SL"] = ("Saarland", "N/A", "Ministerium für Umwelt und Verbraucherschutz Saarland") +dstates["SN"] = ("Sachsen", "N/A", "Sächsisches Landesamt für Umwelt, Landwirtschaft und Geologie") +dstates["ST"] = ("Sachsen-Anhalt", "LAU", "Landesamt für Umweltschutz Sachsen-Anhalt") +dstates["SH"] = ("Schleswig-Holstein", "N/A", "Ministerium für Energiewende, Landwirtschaft, " + \ + "Umwelt und ländliche Räume") +dstates["TH"] = ("Thüringen", "N/A", "Thüringer Landesanstalt für Umwelt und Geologie") +dstates["UB"] = ("unknown", "UBA", "Umweltbundesamt") + + + +def insert_one_station(station_code_toInsert, io_handler = None): + network_name = 'UBA' + station_name = "unknown" + station_country = "Germany" + station_lat = None + station_lon = None + station_alt = None + station_category = "unknown" + station_type = "unknown" + station_type_of_area = "unknown" + station_timeshift = 0. + station_state = "unknown" + station_timezone = "Europe/Berlin" + stationfile = "StationMeta.csv" + #infile = open(stationfile,'rb') + infile = open(stationfile, 'r', encoding="ISO-8859-1") + csvReader = csv.reader(infile, delimiter=';', quotechar='"') + for i, values in enumerate(csvReader): + #overread header information (two lines) + if i < 2: + continue + station_code = values[0].strip() + if station_code != station_code_toInsert.strip(): + continue + station_local_id = station_code + station_name = values[1] + station_lat = float(values[13].replace(',', '.')) + station_lon = float(values[12].replace(',', '.')) + station_alt = float(values[14].replace(',', '.')) + station_type = TYPES_DICT[values[16]] + station_type_of_area = TYPES_OF_AREA_DICT[values[15]] + + # replace single quote in station name + if station_name.find("'") != -1: + station_name = station_name.replace("'", '"') + + station_state = dstates[station_code[2:4]][0] + + # create station tuple + st = (None, network_name, station_code, station_local_id, + station_type, station_type_of_area, station_category, + station_name, station_country, station_state, + station_lon, station_lat, station_alt, station_timezone) + + # insert values in database + if io_handler is not None: + # changes are automatically committed (commit=True is default) + success = io_handler.update_stations(*st, noclobber = True) + print("inserted new station %s into database!" % (station_code,)) + infile.close() + return + infile.close() + return + + +def insert_parameter_series(missing_station_code,parameter,io_handler): + + #set missing fields + numid = io_handler.get_station_code(station_code=missing_station_code, network_name="UBA")[0] + parameter_attribute = "" + parameter_contributor_shortname = dstates[missing_station_code[2:4]][1] + parameter_dataset_type = "hourly" + parameter_label_values = [parameter_attribute, + parameter_contributor_shortname, + parameter_dataset_type, + ] + parameter_label = io_handler.update_parameter_labels(numid, parameter, parameter_label_values) + parameter_sampling_type = "continuous" + parameter_original_units = UBA_PARAMETER_UNITS[parameter] + parameter_calibration = "" + parameter_contributor = dstates[missing_station_code[2:4]][2] + parameter_contributor_country = "Germany" + parameter_status = 0 + comments = "" + creation_date = dt.datetime.now() + modification_date = creation_date + # Add default start and end date temporarily and + # then update it later once the data is inserted + data_start_date = "1900-01-01 00:00:00" + data_end_date = "1900-01-01 00:00:00" + + # read missing information from file + station_parameter_metafile = "StationparameterMeta.csv" +# infile = open(station_parameter_metafile, 'rU') + infile = open(station_parameter_metafile, 'r', encoding="ISO-8859-1") + csvReader = csv.reader(infile, delimiter=';') + for i, values in enumerate(csvReader): + # overread header lines + if i < 2: + continue + station_code = values[0] + specname = values[2] + if specname in PARAMETERS_DICT.keys(): + if station_code == missing_station_code and PARAMETERS_DICT[specname] == parameter: + type_of_parameter = values[1].strip() + if type_of_parameter == "Particulate*": + parameter_sampling_type = "filter" + else: + parameter_sampling_type = "continuous" + parameter_measurement_method = values[11].strip() + + # create parameter series tuple + pst = (numid, parameter_label, parameter, None, parameter_attribute, + parameter_sampling_type, parameter_measurement_method, + parameter_original_units, parameter_calibration, + parameter_contributor_shortname, parameter_contributor, + parameter_contributor_country, parameter_dataset_type, + parameter_status, comments, creation_date, + modification_date, data_start_date, data_end_date) + + # insert values in database + if io_handler is not None: + success = io_handler.update_parameter_series(*pst, noclobber = True) + if success: + io_handler.commit() + print("commited new parameter_series for %s and %s!" % (station_code,parameter)) + infile.close() + return + infile.close() + return + + +def insert_invented_parameter_series(missing_station_code,parameter,io_handler): + + #set missing fields + numid = io_handler.get_station_code(station_code=missing_station_code, network_name="UBA")[0] + parameter_attribute = "" + parameter_contributor_shortname = dstates[missing_station_code[2:4]][1] + parameter_dataset_type = "hourly" + parameter_label_values = [parameter_attribute, + parameter_contributor_shortname, + parameter_dataset_type, + ] + parameter_label = io_handler.update_parameter_labels(numid, parameter, parameter_label_values) + parameter_sampling_type = "continuous" + parameter_original_units = UBA_PARAMETER_UNITS[parameter] + parameter_calibration = "" + parameter_contributor = dstates[missing_station_code[2:4]][2] + parameter_contributor_country = "Germany" + parameter_measurement_method = "unknown" + parameter_status = 2 + comments = "" + creation_date = dt.datetime.now() + modification_date = creation_date + # Add default start and end date temporarily and + # then update it later once the data is inserted + data_start_date = "1900-01-01 00:00:00" + data_end_date = "1900-01-01 00:00:00" + + # create parameter series tuple + pst = (numid, parameter_label, parameter, None, parameter_attribute, + parameter_sampling_type, parameter_measurement_method, + parameter_original_units, parameter_calibration, + parameter_contributor_shortname, parameter_contributor, + parameter_contributor_country, parameter_dataset_type, + parameter_status, comments, creation_date, + modification_date, data_start_date, data_end_date) + + # insert values in database + if io_handler is not None: + success = io_handler.update_parameter_series(*pst, noclobber = True) + if success: + io_handler.commit() + print("commited new parameter_series for %s and %s!" % (missing_station_code,parameter)) + return + + +def read_missing_stations(filename, io_handler = None): + """Read all stations data from stations table, add missing fields and + its default values and then returns stations dictionary""" + + # read list of station codes + f = open(filename, 'r') + missing_station_codes = set() +# missing_station_codes = { 'DENW338', 'DEST105', 'DEST106', 'DEUB046', 'DERP053', +# 'DEMV026', 'DEBB092', 'DEBB099', 'DEBY189', 'DEBY196', +# 'DERP060', 'DEMV031', 'DEST108', 'DEBW156', 'DEHE112', +# 'DEHE116', 'DEHE131', 'DEMV025', 'DENW329', 'DENW337', +# 'DENW355', 'DENW367', 'DESH052', 'DESH053', 'DESH055', +# 'DEST112', 'DEHH079', 'DEHH081', 'DEBY123', 'DEBY124', +# 'DEBY187', 'DEBY188' } + for line in f: + missing_station_code, parameter = line.strip().split(' ') + # one station_code might report multiple parameters + # ==> insert station only once + if not missing_station_code in missing_station_codes: + insert_one_station(missing_station_code, io_handler) + missing_station_codes |= { missing_station_code } + #also add missing parameter_series + # ==> add parameter_series for every parameter + insert_parameter_series(missing_station_code,parameter,io_handler) + + +def apply_filters(parameter, value): + """ Apply the threshold filters to given value. + If value passes all filters return the value and it's flag + else return the None for value and flag""" + db_val = None + db_flag = None + # flag: either OKPreliminary or NotCheckedPreliminary + if parameter in PARAMETER_THRESHOLDS.keys(): + threshold1 = PARAMETER_THRESHOLDS[parameter]["threshold1"] + threshold2 = PARAMETER_THRESHOLDS[parameter]["threshold2"] + if (value > threshold1[0] and value < threshold1[1]): + if (value > threshold2[0] and value < threshold2[1]): + db_flag = "OKPreliminary" + else: + db_flag = "DoubtfulPreliminary" + # convert the value in database default unit + if parameter in UBA_UNIT_CONVERSION.keys(): + formula = UBA_UNIT_CONVERSION[parameter] + db_val = formula[0](value, formula[1]) + else: + db_val = value + else: + # convert the value in database default unit + if parameter in UBA_UNIT_CONVERSION.keys(): + formula = UBA_UNIT_CONVERSION[parameter] + db_val = formula[0](value, formula[1]) + else: + db_val = value + db_flag = WMO_QUALITY_FLAG["OKPreliminary"] + return db_val, db_flag + + +#add data into the database +def addData(db, csv_file): + + minor=dt.datetime.now().strftime("%Y%m%d%H%M%S") + db_version=f'000000.000001.{minor}' + lfirstmiss = True + updatedStations = dict() + newStations = dict() + + # The file has to be converted from DOS to Unix: + # In Python3, this cannot be done any longer via using 'rb' for 'reading binary'. + # (see: https://pythonconquerstheuniverse.wordpress.com/2011/05/08/newline-conversion-in-python-3/) + # csvReader = csv.reader(open(csv_file, 'rb'), delimiter=';', quotechar='|') + fileContents = open(csv_file,"r").read() + f = open(csv_file,"w", newline="\n") + f.write(fileContents) + f.close() + csvReader = csv.reader(open(csv_file, 'r'), delimiter=';', quotechar='|') + for j, values in enumerate(csvReader): + if j < 2: + continue + + values[1] = values[1][1:-1] + if values[1] in PARAMETERS_DICT.keys(): + values[0] = values[0][1:-1] + values[2] = values[2][1:-1] + if re.match("(.{4})(.{2})(.{2})$", values[2]): + date_arr=re.split("(.{4})(.{2})(.{2})$", values[2]) + year = date_arr[1] + month = date_arr[2] + day = date_arr[3] + station_code = values[0] + species = PARAMETERS_DICT[values[1]] + # get variable_id + r = requests.get(TOAR_SERVICE_URL + f'variables/{species}',verify=insecure_ssl) + data = r.json() + variable_id=data['id'] + date_obj = dt.datetime(int(year), int(month), int(day)) + else: + sys.stderr.write("error in date %s\n" % (values[2])) + return + + # check, whether station already exists in database (and get id) + r = requests.get(TOAR_SERVICE_URL + f'stationmeta/{station_code}',verify=insecure_ssl) + data = r.json() + if 'detail' in data: + # if station is not present in database send email and save data in new file + sys.stderr.write("station %s not present in database!\n" % (station_code,)) + # for the moment: report data of missing stations to file + if lfirstmiss: + outstationfilename = "missing_stations_" + dt.datetime.now().strftime("%Y-%m-%d_%H") + ".csv" + outstationfile = open(dir_name + '/' + outstationfilename, 'w') + outdatafilename = "missing_stations_data_" + dt.datetime.now().strftime("%Y-%m-%d_%H") + ".csv" + outdatafile = open(dir_name + '/' + outdatafilename, 'w') + lfirstmiss = False + outstationfile.write("%s %s\n" % (station_code, species)) + for value in values[:-1]: + outdatafile.write("%s," % value) + outdatafile.write("%s\n" % values[-1]) + else: + numid=data['id'] + + # get _contributor and _contributor_shortname from dstates dict + k = station_code[2:4].upper() + parameter_contributor_shortname = dstates[k][1] + + # get series id for given station_code and species + # (see: identify timeseries of TOAR_TG_Vol02_Data_Processing.docx) + # Criterion 3.1: id of the corresponding station (see Steps 2a-2c) + # Criterion 3.2: variable id (see Step 1) + # Criterion 3.3: role: resource_provider (organisation) + # Criterion 3.4: version number + # Criterion 3.5: data_source (measurement or model) + # Criterion 3.6: measurement method or model experiment identifier (e.g. COSMOS-REA6, COSMO-EPS, ECMWF-ERA5, etc.) + # Criterion 3.7: sampling height + # Criterion 3.8: data filtering procedures + + # label should be implemented as a database function + # for the time being: try with other unique information (for UBA) + + label='' + r = requests.get(TOAR_SERVICE_URL + f'timeseries/unique/?station_id={numid}&variable_id={variable_id}&resource_provider=UBA&label={label}',verify=insecure_ssl) + data = r.json() + if 'detail' in data: + # series not present yet (new species for the station) + insert_parameter_series(station_code,species,db) + # now get series_id from freshly inserted parameter_series + res = db.search(network_name='UBA', + station_code=station_code, + parameter_name=species, + parameter_contributor_shortname=parameter_contributor_shortname, + parameter_dataset_type = 'hourly', + columns='id') + try: + series_id = res[0].id[0] + newStations[series_id] = dt.datetime(int(9999), int(12), int(31)) + except IndexError: + print("parameter_series %s for station %s not reported in 'StationparameterMeta.csv'!" % (species,station_code)) + print("now inventing metadata for parameter_series %s for station %s!" % (species,station_code)) + # series not present yet (new species for the station -- metadata to be invented!) + insert_invented_parameter_series(station_code,species,db) + # now get series_id from freshly inserted parameter_series + res = db.search(network_name='UBA', + station_code=station_code, + parameter_name=species, + parameter_contributor_shortname=parameter_contributor_shortname, + parameter_dataset_type = 'hourly', + columns='id') + series_id = res[0].id[0] + newStations[series_id] = dt.datetime(int(9999), int(12), int(31)) + series_id = data['id'] + val_arr = [] + for i in range(3, (len(values)-1), 2): + # first filter: remove missing values (missing value =999./-999./111./-111.) + '''missing value = 999. or -999. + calibration error = -111. + if values[i] or values[i+1] is missing value: + val = None + elif values[i] and values[i+1] is calibration error: + val = None + elif values[i] is calibration error: + val = values[i+1] + elif values[i+1] is calibration error: + val = values[i] + else: + val = average of values[i] and values[i+1] + ''' + val = None + if float(values[i]) in (999., -999.) or float(values[i+1]) in (999., -999.): + continue + elif float(values[i]) == -111. and float(values[i+1]) == -111.: + continue + elif float(values[i]) == -111.: + val = values[i+1] + elif float(values[i+1]) == -111.: + val = values[i] + else: + if (values[1]=='Windrichtung' and abs(float(values[i]) -float(values[i+1]) )>180.): + wind = (float(values[i]) + float(values[i+1]) - 360. )/2. + if wind < 0.: + wind += 360. + val = wind + else: + val = (float(values[i]) +float(values[i+1])) /2. + + if val is not None: + db_val, db_flag = apply_filters(species, val) + if db_val is not None: + db_datetime = date_obj + dt.timedelta(hours = (i-3)/2) + # insert/update in database + r = requests.post(TOAR_SERVICE_URL + f'data/record/?series_id={series_id}&datetime={db_datetime}&value={db_val}&flag={db_flag}&version={db_version}',verify=insecure_ssl) + data = r.json() + if (series_id in updatedStations): + updatedStations[series_id] = max(db_datetime,updatedStations[series_id]) + else: + updatedStations[series_id] = db_datetime + if (series_id in newStations): + newStations[series_id] = min(db_datetime,newStations[series_id]) + if not lfirstmiss: + outstationfile.close() + outdatafile.close() + # insert missing stations automatically + read_missing_stations(outstationfilename, db) + # still todo: insert missing stations' data automatically + #read_missing_data(outdatafilename, db) + #commit all changes at the end of the program at once! (issue of run time) + db.commit() + newStations2={ k:v for k, v in newStations.items() if v != dt.datetime(9999, 12, 31, 0, 0)} + return updatedStations, newStations2 + + +def update_parameter_series_dates(db, updatedStations, newStations): + """Update data_end_date in parameter_series table + for all updated UBA stations and its species """ + + for series_id, data_end_date in updatedStations.items(): + db.update_parameter_series_dates(series_id = series_id, + data_end_date = data_end_date) + for series_id, data_start_date in newStations.items(): + db.update_parameter_series_dates(series_id = series_id, + data_start_date = data_start_date) + # commit changes at end + db.commit() + + +# download files for given urls +def download(): +# url_link_data = "wget http://Luftdaten:2jNmbXCjbh7xyCJqsNPaKMd4d@www.luftdaten.umweltbundesamt.de/files/uba_%s.csv >/dev/null 2>&1" % (dt.datetime.now().strftime("%Y%m%d"),) + url_link_data = "wget --user Luftdaten --password 2jNmbXCjbh7xyCJqsNPaKMd4d http://www.luftdaten.umweltbundesamt.de/files/uba_%s.csv >/dev/null 2>&1" % (dt.datetime.now().strftime("%Y%m%d"),) +# dest = url_link_data[87:103] + dest = url_link_data[105:121] + try: + os.system(url_link_data) + except: + sys.stderr.write("Could not retrieve data file!") + return "" + + if not os.path.exists(dest): + dest = "" + try: + os.system(url_link_stationsparameter) + os.system(url_link_stations) + except: + sys.stderr.write("Could not retrieve metadata files!") + return dest + + return dest + +if __name__ == "__main__": + # connect to database + with psycopg2.connect(host="zam10116.zam.kfa-juelich.de", dbname='toardb_v2', user='toarcurator') as db: + + #next three lines to be deleted + csv_file='uba_20201218.csv' + updatedStations, newStations = addData(db, csv_file) + exit() + #remove old file StationparameterMeta.csv from previous download + if os.path.exists('StationparameterMeta.csv'): + os.remove('StationparameterMeta.csv') + # download csv files from uba site + csv_file = download() + if csv_file != "": + if os.path.isfile(csv_file): + updatedStations, newStations = addData(db, csv_file) + # update new data range in parameter_series + update_parameter_series_dates(db,updatedStations,newStations) + #at the end remove downloaded files + #don't remove StationparameterMeta.csv (it's needed for check_for_unique_parameters.sh) + try: + if os.path.exists(csv_file): + os.remove(csv_file) + if os.path.exists('StationMeta.csv'): + os.remove('StationMeta.csv') + except OSError: + pass diff --git a/harvesting/requirements.txt b/harvesting/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..fbc4d16663b3ddf0d016957d33ebc40b0d360198 --- /dev/null +++ b/harvesting/requirements.txt @@ -0,0 +1,4 @@ +numpy==1.18.3 +pkg-resources==0.0.0 +psycopg2-binary==2.8.5 +requests==2.25.1 diff --git a/toardb/contacts/contacts.py b/toardb/contacts/contacts.py index 210c090023711fce7393e3cd17ac7906c62dff64..a2dd70a15ddd34cebc86fcbcffc2aa6419627b80 100644 --- a/toardb/contacts/contacts.py +++ b/toardb/contacts/contacts.py @@ -79,3 +79,10 @@ def create_organisation(organisation: schemas.OrganisationCreate = Body(..., emb def get_all_contacts(skip: int = 0, limit: int = None, db: Session = Depends(get_db)): contacts = crud.get_all_contacts(db, skip=skip, limit=limit) return contacts + +#get a special entry of table contacts (given by ID) +@router.get('/contacts/id/{contact_id}', response_model=schemas.Contact) +def get_all_contacts(contact_id: int, db: Session = Depends(get_db)): + contact = crud.get_contact(db, contact_id=contact_id) + return contact + diff --git a/toardb/contacts/crud.py b/toardb/contacts/crud.py index 883382f039b326f75981fd6e5e21b4aa78693327..737e3cd1bc9b78d0a208944d5a4684376f0028a1 100644 --- a/toardb/contacts/crud.py +++ b/toardb/contacts/crud.py @@ -62,3 +62,6 @@ def create_person(db: Session, person: PersonCreate): def get_all_contacts(db: Session, skip : int = 0, limit: int = None): return db.query(models.Contact).offset(skip).limit(limit).all() + +def get_contact(db: Session, contact_id: int): + return db.query(models.Contact).filter(models.Contact.id == contact_id).first() diff --git a/toardb/data/crud.py b/toardb/data/crud.py index a2029248f467f95f59c37b4bd886843288803b12..4fde077a9970793857108c334abd67bcfa59053e 100644 --- a/toardb/data/crud.py +++ b/toardb/data/crud.py @@ -12,6 +12,7 @@ from geoalchemy2.elements import WKBElement, WKTElement from fastapi import File, UploadFile from fastapi.responses import JSONResponse from . import models, schemas +from .models import DF_enum from toardb.variables import models as variables_models from toardb.stationmeta import models as stationmeta_models from toardb.stationmeta.crud import get_stationmeta_core @@ -36,6 +37,24 @@ def get_all_data(db: Session, skip : int = 0, limit: int = None): return db.query(models.Data).offset(skip).limit(limit).all() +def create_data_record(db: Session, engine: Engine, + series_id: int, datetime: dt.datetime, + value: float, flag: str, version: str): + flag_num = get_value_from_str(DF_enum,flag) + data_dict = {"datetime": datetime, + "value": value, + "flags": flag_num, + "version": version, + "timeseries_id": series_id} + data = models.Data(**data_dict) + db.add(data) + result = db.commit() + db.refresh(data) + status_code=200 + message='Data successfully inserted!' + return JSONResponse(status_code=status_code, content=message) + + def create_data(db: Session, engine: Engine, input_handle: UploadFile = File(...)): # a timeseries is defined by the unique_constraint of (station_id, variable_id, label) # station_id: from header diff --git a/toardb/data/data.py b/toardb/data/data.py index ca8250f31c4006f3236d7d19076e667906d1f520..c8dfa8b8e36c21102ae87e306ec8bea02fadcb56 100644 --- a/toardb/data/data.py +++ b/toardb/data/data.py @@ -10,6 +10,7 @@ from sqlalchemy.orm import Session from sqlalchemy.engine import Engine from . import crud, schemas from toardb.utils.database import ToarDbSession, engine, get_engine, get_db +import datetime as dt router = APIRouter() @@ -55,6 +56,21 @@ async def create_data(file: UploadFile = File(...), db: Session = Depends(get_db raise HTTPException(status_code=400, detail=msg) return response +@router.post('/data/record/', response_model=schemas.DataCreate) +async def create_data_record(series_id: int, datetime: dt.datetime, + value: float, flag: str, version: str, + db: Session = Depends(get_db), engine: Engine = Depends(get_engine)): + response = crud.create_data_record(db, engine, series_id=series_id, datetime=datetime, + value=value, flag=flag, version=version) + if response.status_code != 200: + msg = response.body.decode('utf-8') + # try to parse error messages from DBS (to be more understandable) + msg2 = '"An error occurred in data insertion: <class \'psycopg2.errors.UniqueViolation\'>"' + if (msg == msg2): + msg = 'Data for timeseries already registered.' + raise HTTPException(status_code=400, detail=msg) + return response + @router.patch('/data/', response_model=schemas.DataCreate) async def patch_data(description: str, version: str, file: UploadFile = File(...), db: Session = Depends(get_db), engine: Engine = Depends(get_engine)): diff --git a/toardb/timeseries/crud.py b/toardb/timeseries/crud.py index 9e30fb550b8c6fd8a9a6002775fdddbb1648654c..c4868d57344225b527a4236f0b635ee230b20c20 100644 --- a/toardb/timeseries/crud.py +++ b/toardb/timeseries/crud.py @@ -15,6 +15,7 @@ from .models import TimeseriesChangelog, timeseries_timeseries_roles_table, \ from toardb.stationmeta.models import StationmetaCore from toardb.stationmeta.schemas import get_coordinates_from_geom, get_geom_from_coordinates from toardb.generic.models import RS_enum, RC_enum +from toardb.contacts.crud import get_organisation_by_name, get_contact from .schemas import TimeseriesCreate, TimeseriesPatch from toardb.utils.utils import get_value_from_str, get_str_from_value @@ -43,19 +44,24 @@ def get_all_timeseries(db: Session, skip : int = 0, limit: int = None): return db_objects -def get_timeseries_by_unique_constraints(db: Session, station_id: int, variable_id: int, label: str): - db_object = db.query(models.Timeseries).filter(models.Timeseries.station_id == station_id) \ +def get_timeseries_by_unique_constraints(db: Session, station_id: int, variable_id: int, resource_provider: str, label: str): + ret_db_object = None + db_objects = db.query(models.Timeseries).filter(models.Timeseries.station_id == station_id) \ .filter(models.Timeseries.variable_id == variable_id) \ - .filter(models.Timeseries.label == label.strip()) \ - .first() - # there is a mismatch with additional_metadata - if db_object: - db_object.additional_metadata = str(db_object.additional_metadata).replace("'",'"') - # there is also a mismatch with coordinates and additional_metadata from station object - if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)): - db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates) - db_object.station.additional_metadata = str(db_object.station.additional_metadata).replace("'",'"') - return db_object + .filter(models.Timeseries.label == label.strip()).all() + role_num = get_value_from_str(RC_enum,'ResourceProvider') + for db_object in db_objects: + for role in db_object.roles: + contact = get_contact(db, contact_id=role.contact_id) + if ((contact.organisation.name == resource_provider) and (role_num == role.role)): + ret_db_object = db_object + # there is a mismatch with additional_metadata + ret_db_object.additional_metadata = str(ret_db_object.additional_metadata).replace("'",'"') + # there is also a mismatch with coordinates and additional_metadata from station object + if isinstance(ret_db_object.station.coordinates, (WKBElement, WKTElement)): + ret_db_object.station.coordinates = get_coordinates_from_geom(ret_db_object.station.coordinates) + ret_db_object.station.additional_metadata = str(ret_db_object.station.additional_metadata).replace("'",'"') + return ret_db_object def get_timeseries_changelog(db: Session, timeseries_id: int): diff --git a/toardb/timeseries/timeseries.py b/toardb/timeseries/timeseries.py index df051c211ac2c1cd51f8c4ce174ae912ae3d55b1..01c63d7c1e2577ab3c771b86fbf98c87aced2d43 100644 --- a/toardb/timeseries/timeseries.py +++ b/toardb/timeseries/timeseries.py @@ -19,7 +19,7 @@ router = APIRouter() def get_all_timeseries(skip: int = 0, limit: int = None, db: Session = Depends(get_db)): return crud.get_all_timeseries(db, skip=skip, limit=limit) -#get all metadata of one timeseries +#get all metadata of one timeseries (known its ID) @router.get('/timeseries/{timeseries_id}', response_model=schemas.Timeseries) def get_timeseries(timeseries_id: int, db: Session = Depends(get_db)): db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id) @@ -27,6 +27,15 @@ def get_timeseries(timeseries_id: int, db: Session = Depends(get_db)): raise HTTPException(status_code=404, detail="Timeseries not found.") return db_timeseries +#get all metadata of one timeseries (known its unique label) +@router.get('/timeseries/unique/', response_model=schemas.Timeseries) +def get_timeseries(station_id: int, variable_id: int, resource_provider: str , label: str='', db: Session = Depends(get_db)): + db_timeseries = crud.get_timeseries_by_unique_constraints(db, station_id=station_id, + variable_id=variable_id, resource_provider=resource_provider, label=label) + if db_timeseries is None: + raise HTTPException(status_code=404, detail="Timeseries not found.") + return db_timeseries + @router.get('/timeseries_changelog/{timeseries_id}', response_model=List[schemas.TimeseriesChangelog]) def get_timeseries_changelog(timeseries_id: int, db: Session = Depends(get_db)): db_changelog = crud.get_timeseries_changelog(db, timeseries_id=timeseries_id)