diff --git a/src/data_generator.py b/src/data_generator.py index 3d8a1c7c242da3d45a1b17361e210a016a419dd6..dcd02bdd9530901026176df6de533ac0c3e84114 100644 --- a/src/data_generator.py +++ b/src/data_generator.py @@ -19,9 +19,9 @@ class DataGenerator(keras.utils.Sequence): """ def __init__(self, data_path: str, network: str, stations: Union[str, List[str]], variables: List[str], - interpolate_dim: str, target_dim: str, target_var: str, interpolate_method: str = "linear", - limit_nan_fill: int = 1, window_history: int = 7, window_lead_time: int = 4, - transform_method: str = "standardise", **kwargs): + interpolate_dim: str, target_dim: str, target_var: str, station_type: str = None, + interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history: int = 7, + window_lead_time: int = 4, transform_method: str = "standardise", **kwargs): self.data_path = os.path.abspath(data_path) self.network = network self.stations = helpers.to_list(stations) @@ -29,6 +29,7 @@ class DataGenerator(keras.utils.Sequence): self.interpolate_dim = interpolate_dim self.target_dim = target_dim self.target_var = target_var + self.station_type = station_type self.interpolate_method = interpolate_method self.limit_nan_fill = limit_nan_fill self.window_history = window_history @@ -41,8 +42,9 @@ class DataGenerator(keras.utils.Sequence): display all class attributes """ return f"DataGenerator(path='{self.data_path}', network='{self.network}', stations={self.stations}, " \ - f"variables={self.variables}, interpolate_dim='{self.interpolate_dim}', target_dim='{self.target_dim}'" \ - f", target_var='{self.target_var}', **{self.kwargs})" + f"variables={self.variables}, station_type='{self.station_type}', " \ + f"interpolate_dim='{self.interpolate_dim}', target_dim='{self.target_dim}', " \ + f"target_var='{self.target_var}', **{self.kwargs})" def __len__(self): """ @@ -94,7 +96,8 @@ class DataGenerator(keras.utils.Sequence): :return: preprocessed data as a DataPrep instance """ station = self.get_station_key(key) - data = DataPrep(self.data_path, self.network, station, self.variables, **self.kwargs) + data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type, + **self.kwargs) data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill) data.transform("datetime", method=self.transform_method) data.make_history_window(self.interpolate_dim, self.window_history) diff --git a/src/data_preparation.py b/src/data_preparation.py index 873433f499f51c003988d8b33da7a525d14544fa..3030acfb041a536cc790a6519573db57915cd945 100644 --- a/src/data_preparation.py +++ b/src/data_preparation.py @@ -44,11 +44,13 @@ class DataPrep(object): """ - def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs): + def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], + station_type: str = None, **kwargs): self.path = os.path.abspath(path) self.network = network self.station = helpers.to_list(station) self.variables = variables + self.station_type = station_type self.mean = None self.std = None self.history = None @@ -92,7 +94,8 @@ class DataPrep(object): :return: """ df_all = {} - df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var) + df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var, + station_type=self.station_type, network_name=self.network) df_all[self.station[0]] = df # convert df_all to xarray xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} diff --git a/src/join.py b/src/join.py index 2b13dcf41c5bc03e9dba274fd5e643c79b091cde..4f9f36f960bc5a757a70b39222fb183ccec7aa8f 100644 --- a/src/join.py +++ b/src/join.py @@ -3,7 +3,6 @@ __date__ = '2019-10-16' import requests -import json import logging import pandas as pd import datetime as dt @@ -13,12 +12,21 @@ from src import helpers join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' -def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.DataFrame, pd.DataFrame]: +class EmptyQueryResult(Exception): + """ + Exception that get raised if a query to JOIN returns empty results. + """ + pass + + +def download_join(station_name: Union[str, List[str]], statvar: dict, station_type: str = None, network_name: str = None) -> [pd.DataFrame, pd.DataFrame]: """ read data from JOIN/TOAR :param station_name: Station name e.g. DEBY122 :param statvar: key as variable like 'O3', values as statistics on keys like 'mean' + :param station_type: + :param network_name: :returns: - df - pandas df with all variables and statistics - meta - pandas df with all meta information @@ -27,7 +35,8 @@ def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.Dat station_name = helpers.to_list(station_name) # load series information - opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]} + opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type, + "network_name": network_name} url = create_url(**opts) response = requests.get(url) station_vars = response.json() @@ -65,7 +74,7 @@ def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.Dat meta.columns = station_name return df, meta else: - raise ValueError("No data found in JOIN.") + raise EmptyQueryResult("No data found in JOIN.") def _correct_stat_name(stat: str) -> str: @@ -97,7 +106,7 @@ def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' :return: combined url as string """ - url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items()) + url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items() if v is not None) return url diff --git a/src/modules/experiment_setup.py b/src/modules/experiment_setup.py index a76fe60b34b679b5702ec85a11f95002c3c6fe34..a20f0b83e9828550d2f717502b5371c2c1ad7e9a 100644 --- a/src/modules/experiment_setup.py +++ b/src/modules/experiment_setup.py @@ -27,7 +27,7 @@ class ExperimentSetup(RunEnvironment): trainable: Train new model if true, otherwise try to load existing model """ - def __init__(self, parser_args=None, var_all_dict=None, stations=None, network=None, variables=None, + def __init__(self, parser_args=None, var_all_dict=None, stations=None, network=None, station_type=None, variables=None, statistics_per_var=None, start=None, end=None, window_history=None, target_var="o3", target_dim=None, window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None, limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, @@ -53,6 +53,7 @@ class ExperimentSetup(RunEnvironment): self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT) self._set_param("stations", stations, default=DEFAULT_STATIONS) self._set_param("network", network, default="AIRBASE") + self._set_param("station_type", station_type, default=None) self._set_param("variables", variables, default=list(self.data_store.get("var_all_dict", "general").keys())) self._set_param("statistics_per_var", statistics_per_var, default=self.data_store.get("var_all_dict", "general")) self._set_param("start", start, default="1997-01-01", scope="general") diff --git a/src/modules/modules.py b/src/modules/modules.py index 8532e1d812a5de7a3b47423d9f9bb3c9bcd43abc..033fd0779d8d140e684103b27fc7c025dedcdb81 100644 --- a/src/modules/modules.py +++ b/src/modules/modules.py @@ -1,8 +1,9 @@ import logging -# from src.experiment_setup import ExperimentSetup import argparse from src.modules.run_environment import RunEnvironment +from src.modules.experiment_setup import ExperimentSetup +from src.modules.pre_processing import PreProcessing class Training(RunEnvironment): @@ -28,6 +29,7 @@ if __name__ == "__main__": parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, help="set experiment date as string") parser_args = parser.parse_args() - # with run(): - # setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) - # PreProcessing(setup) + with RunEnvironment(): + ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], + station_type='background') + PreProcessing() diff --git a/src/modules/pre_processing.py b/src/modules/pre_processing.py index d999217e9f903d3d67a24179c9f3654fee3e60d4..d3056f52bd0a60e0c9e7ed97fa593f3b596898a4 100644 --- a/src/modules/pre_processing.py +++ b/src/modules/pre_processing.py @@ -5,10 +5,11 @@ from src.data_generator import DataGenerator from src.helpers import TimeTracking from src.modules.run_environment import RunEnvironment from src.datastore import NameNotFoundInDataStore, NameNotFoundInScope +from src.join import EmptyQueryResult DEFAULT_ARGS_LIST = ["data_path", "network", "stations", "variables", "interpolate_dim", "target_dim", "target_var"] -DEFAULT_KWARGS_LIST = ["limit_nan_fill", "window_history", "window_lead_time", "statistics_per_var"] +DEFAULT_KWARGS_LIST = ["limit_nan_fill", "window_history", "window_lead_time", "statistics_per_var", "station_type"] class PreProcessing(RunEnvironment): @@ -110,7 +111,8 @@ class PreProcessing(RunEnvironment): valid_stations.append(station) logging.debug(f"{station}: history_shape = {history.shape}") logging.debug(f"{station}: loading time = {t_inner}") - except AttributeError: + except (AttributeError, EmptyQueryResult): continue - logging.info(f"run for {t_outer} to check {len(all_stations)} station(s)") + logging.info(f"run for {t_outer} to check {len(all_stations)} station(s). Found {len(valid_stations)}/" + f"{len(all_stations)} valid stations.") return valid_stations