diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index 0c8254b3f6bd3e37f31b5101f4e8852615807648..ce42fc0eed6e891bc0a0625666da3dccfcc8a3ee 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -50,6 +50,9 @@ DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatological "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles", "PlotAvailability", "PlotSeparationOfScales"] DEFAULT_SAMPLING = "daily" +DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", + "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "", + "pm10": "", "so2": ""} def get_defaults(): diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py index 6b960e79a14813c18f56a24642e78901bf687aad..adc5ee0e72694baed6ec0ab0c0bf9259126af292 100644 --- a/mlair/data_handler/data_handler_kz_filter.py +++ b/mlair/data_handler/data_handler_kz_filter.py @@ -40,7 +40,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): Setup samples. This method prepares and creates samples X, and labels Y. """ data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally) + self.station_type, self.network, self.store_data_locally, self.data_origin) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 74e1b42f465900c00ee11d06c61354d06ab31d37..aa1f0d55b55757875b640de00f66e62dd3586b11 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -37,7 +37,8 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: data, self.meta = self.load_data(self.path[ind], self.station, self.statistics_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.start, self.end) + self.station_type, self.network, self.store_data_locally, self.data_origin, + self.start, self.end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) return data @@ -110,7 +111,8 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi start, end = self.start, self.end data, self.meta = self.load_data(self.path[ind], self.station, self.statistics_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, start, end) + self.station_type, self.network, self.store_data_locally, self.data_origin, + start, end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) return data diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 4c274f913ac6668a84793c1d0628728334601d51..e554a3b32d8e4e2f5482a388374cfba87f7add15 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -49,11 +49,12 @@ class DataHandlerSingleStation(AbstractDataHandler): window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_lead_time=DEFAULT_WINDOW_LEAD_TIME, interpolation_limit: int = 0, interpolation_method: str = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, - min_length: int = 0, start=None, end=None, variables=None, **kwargs): + min_length: int = 0, start=None, end=None, variables=None, data_origin: Dict = None, **kwargs): super().__init__() # path, station, statistics_per_var, transformation, **kwargs) self.station = helpers.to_list(station) self.path = self.setup_data_path(data_path, sampling) self.statistics_per_var = statistics_per_var + self.data_origin = data_origin self.do_transformation = transformation is not None self.input_data, self.target_data = self.setup_transformation(transformation) @@ -142,7 +143,8 @@ class DataHandlerSingleStation(AbstractDataHandler): Setup samples. This method prepares and creates samples X, and labels Y. """ data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.start, self.end) + self.station_type, self.network, self.store_data_locally, self.data_origin, + self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() @@ -163,7 +165,7 @@ class DataHandlerSingleStation(AbstractDataHandler): self.remove_nan(self.time_dim) def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None, - store_data_locally=False, start=None, end=None): + store_data_locally=False, data_origin: Dict = None, start=None, end=None): """ Load data and meta data either from local disk (preferred) or download new data by using a custom download method. @@ -182,7 +184,7 @@ class DataHandlerSingleStation(AbstractDataHandler): os.remove(meta_file) data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, station_type=station_type, network=network, - store_data_locally=store_data_locally) + store_data_locally=store_data_locally, data_origin=data_origin) logging.debug(f"loaded new data") else: try: @@ -196,7 +198,7 @@ class DataHandlerSingleStation(AbstractDataHandler): logging.debug(f"load new data") data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, station_type=station_type, network=network, - store_data_locally=store_data_locally) + store_data_locally=store_data_locally, data_origin=data_origin) logging.debug("loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) @@ -205,8 +207,8 @@ class DataHandlerSingleStation(AbstractDataHandler): @staticmethod def download_data_from_join(file_name: str, meta_file: str, station, statistics_per_var, sampling, - station_type=None, network=None, store_data_locally=True) -> [xr.DataArray, - pd.DataFrame]: + station_type=None, network=None, store_data_locally=True, data_origin: Dict = None) \ + -> [xr.DataArray, pd.DataFrame]: """ Download data from TOAR database using the JOIN interface. @@ -220,7 +222,7 @@ class DataHandlerSingleStation(AbstractDataHandler): """ df_all = {} df, meta = join.download_join(station_name=station, stat_var=statistics_per_var, station_type=station_type, - network_name=network, sampling=sampling) + network_name=network, sampling=sampling, data_origin=data_origin) df_all[station[0]] = df # convert df_all to xarray xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} diff --git a/mlair/helpers/join.py b/mlair/helpers/join.py index 4683ba2acd6ae5d4090591234b50c3bfae27bffe..43a0176811b54fba2983c1dba108f4c7977f1431 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/join.py @@ -23,7 +23,8 @@ class EmptyQueryResult(Exception): def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, - network_name: str = None, sampling: str = "daily") -> [pd.DataFrame, pd.DataFrame]: + network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, + pd.DataFrame]: """ Read data from JOIN/TOAR. @@ -32,6 +33,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t :param station_type: set the station type like "traffic" or "background", can be none :param network_name: set the measurement network like "UBA" or "AIRBASE", can be none :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily) + :param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid + origins are "REA" for reanalysis data and "" (empty string) for observational data. :returns: data frame with all variables and statistics and meta data frame with all meta information """ @@ -42,7 +45,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t join_url_base, headers = join_settings(sampling) # load series information - vars_dict = load_series_information(station_name, station_type, network_name, join_url_base, headers) + vars_dict = load_series_information(station_name, station_type, network_name, join_url_base, headers, data_origin) # correct stat_var values if data is not aggregated (hourly) if sampling == "hourly": @@ -123,7 +126,7 @@ def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]: def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, - join_url_base: str, headers: Dict) -> Dict: + join_url_base: str, headers: Dict, data_origin: Dict = None) -> Dict: """ List all series ids that are available for given station id and network name. @@ -132,15 +135,36 @@ def load_series_information(station_name: List[str], station_type: str_or_none, :param network_name: measurement network of the station like "UBA" or "AIRBASE" :param join_url_base: base url name to download data from :param headers: additional headers information like authorization, can be empty + :param data_origin: additional information to select a distinct series e.g. from reanalysis (REA) or from observation + ("", empty string). This dictionary should contain a key for each variable and the information as key :return: all available series for requested station stored in an dictionary with parameter name (variable) as key and the series id as value. """ - opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type, - "network_name": network_name} + opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, + "network_name": network_name, "as_dict": "true", + "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} station_vars = get_data(opts, headers) logging.debug(f"{station_name}: {station_vars}") # ToDo start here for #206 - vars_dict = {item[3].lower(): item[0] for item in station_vars} - return vars_dict + return _select_distinct_series(station_vars, data_origin) + + +def _select_distinct_series(vars: List[Dict], data_origin: Dict = None): + """ + Select distinct series ids for all variables. Also check if a parameter is from REA or not. + """ + if data_origin is None: + data_origin = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", + "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", + "no": "", "no2": "", "o3": "", "pm10": "", "so2": ""} + # ToDo: maybe press, wdir, wspeed from obs? or also temp, ... ? + selected = {} + for var in vars: + name = var["parameter_name"].lower() + var_attr = var["parameter_attribute"].lower() + attr = data_origin.get(name, "").lower() + if var_attr == attr: + selected[name] = var["id"] + return selected def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame: diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index 7fb19e29baed5709e30e4069aa3d681f04e38267..9a9253eda522c39f348dd96700ed38730e87f9a8 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -17,7 +17,7 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_TRAIN_START, DEFAULT_TRAIN_END, DEFAULT_TRAIN_MIN_LENGTH, DEFAULT_VAL_START, DEFAULT_VAL_END, \ DEFAULT_VAL_MIN_LENGTH, DEFAULT_TEST_START, DEFAULT_TEST_END, DEFAULT_TEST_MIN_LENGTH, DEFAULT_TRAIN_VAL_MIN_LENGTH, \ DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS, DEFAULT_EVALUATE_BOOTSTRAPS, DEFAULT_CREATE_NEW_BOOTSTRAPS, \ - DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING + DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING, DEFAULT_DATA_ORIGIN from mlair.data_handler import DefaultDataHandler from mlair.run_modules.run_environment import RunEnvironment from mlair.model_modules.model_class import MyLittleModel as VanillaModel @@ -226,7 +226,7 @@ class ExperimentSetup(RunEnvironment): number_of_bootstraps=None, create_new_bootstraps=None, data_path: str = None, batch_path: str = None, login_nodes=None, hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, sampling_inputs=None, - sampling_outputs=None, **kwargs): + sampling_outputs=None, data_origin: Dict = None, **kwargs): # create run framework super().__init__() @@ -288,6 +288,7 @@ class ExperimentSetup(RunEnvironment): self._set_param("stations", stations, default=DEFAULT_STATIONS, apply=helpers.to_list) self._set_param("statistics_per_var", statistics_per_var, default=DEFAULT_VAR_ALL_DICT) self._set_param("variables", variables, default=list(self.data_store.get("statistics_per_var").keys())) + self._set_param("data_origin", data_origin, default=DEFAULT_DATA_ORIGIN) self._set_param("start", start, default=DEFAULT_START) self._set_param("end", end, default=DEFAULT_END) self._set_param("window_history_size", window_history_size, default=DEFAULT_WINDOW_HISTORY_SIZE) diff --git a/run_mixed_sampling.py b/run_mixed_sampling.py index 5288063ac583e8dad24e253c5ae16810b540c5c8..a70e2aa36e5c1da83c4f667fbbe8b27b5949b4d6 100644 --- a/run_mixed_sampling.py +++ b/run_mixed_sampling.py @@ -22,6 +22,7 @@ def main(parser_args): test_end="2011-12-31", stations=["DEBW107", "DEBW013"], epochs=100, + network="UBA", ) workflow = DefaultWorkflow(**args) workflow.run() diff --git a/test/test_join.py b/test/test_join.py index 791723335e16cf2124512629414ebe626bc20e9c..a9a4c381cbf58a272389b0b11283c8b0cce3ab42 100644 --- a/test/test_join.py +++ b/test/test_join.py @@ -3,7 +3,7 @@ from typing import Iterable import pytest from mlair.helpers.join import * -from mlair.helpers.join import _save_to_pandas, _correct_stat_name, _lower_list +from mlair.helpers.join import _save_to_pandas, _correct_stat_name, _lower_list, _select_distinct_series from mlair.configuration.join_settings import join_settings @@ -52,7 +52,7 @@ class TestGetData: class TestLoadSeriesInformation: def test_standard_query(self): - expected_subset = {'o3': 23031, 'no2': 39002, 'temp--lubw': 17059, 'wspeed': 17060} + expected_subset = {'o3': 23031, 'no2': 39002, 'temp': 85584, 'wspeed': 17060} assert expected_subset.items() <= load_series_information(['DEBW107'], None, None, join_settings()[0], {}).items() @@ -60,6 +60,38 @@ class TestLoadSeriesInformation: assert load_series_information(['DEBW107'], "traffic", None, join_settings()[0], {}) == {} +class TestSelectDistinctSeries: + + @pytest.fixture + def vars(self): + return [{'id': 16686, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'no2', + 'parameter_label': 'NO2', 'parameter_attribute': ''}, + {'id': 16687, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'o3', + 'parameter_label': 'O3', + 'parameter_attribute': ''}, + {'id': 16692, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'press', + 'parameter_label': 'PRESS--LANUV', 'parameter_attribute': ''}, + {'id': 16693, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'temp', + 'parameter_label': 'TEMP--LANUV', 'parameter_attribute': ''}, + {'id': 54036, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'cloudcover', + 'parameter_label': 'CLOUDCOVER', 'parameter_attribute': 'REA'}, + {'id': 88491, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'temp', + 'parameter_label': 'TEMP-REA-MIUB', 'parameter_attribute': 'REA'}, + {'id': 102660, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'press', + 'parameter_label': 'PRESS-REA-MIUB', 'parameter_attribute': 'REA'}] + + def test_no_origin_given(self, vars): + res = _select_distinct_series(vars) + assert res == {"no2": 16686, "o3": 16687, "cloudcover": 54036, "temp": 88491, "press": 102660} + + def test_different_origins(self, vars): + origin = {"no2": "test", "temp": "", "cloudcover": "REA"} + res = _select_distinct_series(vars, data_origin=origin) + assert res == {"o3": 16687, "press": 16692, "temp": 16693, "cloudcover": 54036} + res = _select_distinct_series(vars, data_origin={}) + assert res == {"no2": 16686, "o3": 16687, "press": 16692, "temp": 16693} + + class TestSaveToPandas: @staticmethod