Commit afd91421 authored by lukas leufen's avatar lukas leufen 👻
Browse files

remove station_type parameter, add stats_var to meta check

parent 47a44e94
Pipeline #105065 failed with stages
in 16 minutes and 28 seconds
......@@ -63,8 +63,7 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation):
vars = [self.variables, self.target_var]
stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind])
data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind],
self.station_type, self.store_data_locally, self.data_origin, self.start,
self.end)
self.store_data_locally, self.data_origin, self.start, self.end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind],
limit=self.interpolation_limit[ind], sampling=self.sampling[ind])
......@@ -147,7 +146,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi
stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind])
data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind],
self.station_type, self.store_data_locally, self.data_origin, start, end)
self.store_data_locally, self.data_origin, start, end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind],
limit=self.interpolation_limit[ind], sampling=self.sampling[ind])
return data
......
......@@ -39,7 +39,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
indicates that not all values up to t0 are used, a positive values indicates usage of values at t>t0. Default
is 0.
"""
DEFAULT_STATION_TYPE = "background"
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
......@@ -58,16 +57,15 @@ class DataHandlerSingleStation(AbstractDataHandler):
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane",
"so2", "toluene"]
_hash = ["station", "statistics_per_var", "data_origin", "station_type", "sampling", "target_dim", "target_var",
"time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time",
_hash = ["station", "statistics_per_var", "data_origin", "sampling", "target_dim", "target_var", "time_dim",
"iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time",
"interpolation_limit", "interpolation_method", "variables", "window_history_end"]
def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE,
sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM,
target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM,
window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE,
window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, window_history_end=DEFAULT_WINDOW_HISTORY_END,
window_lead_time=DEFAULT_WINDOW_LEAD_TIME,
def __init__(self, station, data_path, statistics_per_var=None, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING,
target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM,
iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM,
window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET,
window_history_end=DEFAULT_WINDOW_HISTORY_END, window_lead_time=DEFAULT_WINDOW_LEAD_TIME,
interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT,
interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD,
overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True,
......@@ -87,7 +85,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.input_data, self.target_data = None, None
self._transformation = self.setup_transformation(transformation)
self.station_type = station_type
self.sampling = sampling
self.target_dim = target_dim
self.target_var = target_var
......@@ -140,7 +137,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
def __repr__(self):
return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \
f"statistics_per_var={self.statistics_per_var}, station_type='{self.station_type}', " \
f"statistics_per_var={self.statistics_per_var}, " \
f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \
f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \
f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \
......@@ -303,8 +300,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
def make_input_target(self):
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.store_data_locally, self.data_origin,
self.start, self.end)
self.store_data_locally, self.data_origin, self.start, self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit, sampling=self.sampling)
self.set_inputs_and_targets()
......@@ -322,7 +318,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.make_observation(self.target_dim, self.target_var, self.time_dim)
self.remove_nan(self.time_dim)
def load_data(self, path, station, statistics_per_var, sampling, station_type=None, store_data_locally=False,
def load_data(self, path, station, statistics_per_var, sampling, store_data_locally=False,
data_origin: Dict = None, start=None, end=None):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
......@@ -341,31 +337,30 @@ class DataHandlerSingleStation(AbstractDataHandler):
if os.path.exists(meta_file):
os.remove(meta_file)
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, store_data_locally=store_data_locally,
data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim)
logging.debug(f"loaded new data")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta(meta, station, station_type, data_origin)
self.check_station_meta(meta, station, data_origin, statistics_per_var)
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data")
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, store_data_locally=store_data_locally,
data_origin=data_origin, time_dim=self.time_dim,
target_dim=self.target_dim, iter_dim=self.iter_dim)
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data, start=start, end=end)
data = self.check_for_negative_concentrations(data)
return data, meta
def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, station_type=None,
def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling,
store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM,
target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]:
"""
......@@ -401,8 +396,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
if toar_origin is None or len(toar_stats) > 0:
# load combined data from toar-data (v2 & v1)
df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats,
sampling=sampling, data_origin=toar_origin,
station_type=station_type)
sampling=sampling, data_origin=toar_origin)
df = pd.concat([df_era5, df_toar], axis=1, sort=True)
if meta_era5 is not None and meta_toar is not None:
......@@ -410,6 +404,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
else:
meta = meta_era5 if meta_era5 is not None else meta_toar
meta.loc["data_origin"] = str(data_origin)
meta.loc["statistics_per_var"] = str(statistics_per_var)
df_all[station[0]] = df
# convert df_all to xarray
......@@ -422,22 +417,21 @@ class DataHandlerSingleStation(AbstractDataHandler):
return xarr, meta
@staticmethod
def check_station_meta(meta, station, station_type, data_origin):
def check_station_meta(meta, station, data_origin, statistics_per_var):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if station_type is not None:
check_dict = {"station_type": station_type, "type": station_type, "data_origin": str(data_origin)}
for (k, v) in check_dict.items():
if v is None or k not in meta.index:
continue
if meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
check_dict = {"data_origin": str(data_origin), "statistics_per_var": str(statistics_per_var)}
for (k, v) in check_dict.items():
if v is None or k not in meta.index:
continue
if meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
......
......@@ -68,8 +68,7 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation):
def make_input_target(self):
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.store_data_locally, self.data_origin, self.start,
self.end)
self.store_data_locally, self.data_origin, self.start, self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
self.set_inputs_and_targets()
......
......@@ -76,7 +76,7 @@ def retries_session(max_retries=3):
return http
def download_toar(station, toar_stats, sampling, data_origin, station_type=None):
def download_toar(station, toar_stats, sampling, data_origin):
try:
# load data from toar-data (v2)
......@@ -87,7 +87,7 @@ def download_toar(station, toar_stats, sampling, data_origin, station_type=None
try:
# load join data (toar-data v1)
df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling,
station_type=station_type, data_origin=data_origin)
data_origin=data_origin)
except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError):
df_join, meta_join = None, None
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment