Skip to content
Snippets Groups Projects
Commit d1c26a21 authored by leufen1's avatar leufen1
Browse files

start refac (not running version, /close #329)

parent b1de15c6
Branches
Tags
7 merge requests!353add developments to release v1.5.0,!352Resolve "release v1.5.0",!343Update wrf with develop,!342Include sample-uncertainty to wrf workflow,!332Resolve "REFAC: individual trafo for unfiltered data",!331Resolve "REFAC: do not stop if filter plots fail",!259Draft: Resolve "WRF-Datahandler should inherit from SingleStationDatahandler"
Pipeline #77488 passed
...@@ -4,6 +4,7 @@ __author__ = 'Lukas Leufen' ...@@ -4,6 +4,7 @@ __author__ = 'Lukas Leufen'
__date__ = '2020-08-26' __date__ = '2020-08-26'
import inspect import inspect
import copy
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import xarray as xr import xarray as xr
...@@ -12,7 +13,7 @@ from functools import partial ...@@ -12,7 +13,7 @@ from functools import partial
import logging import logging
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation
from mlair.data_handler import DefaultDataHandler from mlair.data_handler import DefaultDataHandler
from mlair.helpers import remove_items, to_list, TimeTrackingWrapper from mlair.helpers import remove_items, to_list, TimeTrackingWrapper, statistics
from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter
from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf
...@@ -162,6 +163,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): ...@@ -162,6 +163,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation):
self.filter_order = self._prepare_filter_order(filter_order, removed_index, self.fs) self.filter_order = self._prepare_filter_order(filter_order, removed_index, self.fs)
self.filter_window_type = filter_window_type self.filter_window_type = filter_window_type
self._add_unfiltered = filter_add_unfiltered self._add_unfiltered = filter_add_unfiltered
self.unfiltered_name = "unfiltered"
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@staticmethod @staticmethod
...@@ -250,7 +252,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): ...@@ -250,7 +252,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation):
index.append(f"band{band_num}") index.append(f"band{band_num}")
band_num += 1 band_num += 1
if self._add_unfiltered: if self._add_unfiltered:
index.append("unfiltered") index.append(self.unfiltered_name)
self.filter_dim_order = index self.filter_dim_order = index
return pd.Index(index, name=self.filter_dim) return pd.Index(index, name=self.filter_dim)
...@@ -261,6 +263,97 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): ...@@ -261,6 +263,97 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation):
_data, _meta, _input_data, _target_data, self.fir_coeff, self.filter_dim_order = lazy_data _data, _meta, _input_data, _target_data, self.fir_coeff, self.filter_dim_order = lazy_data
super(__class__, self)._extract_lazy((_data, _meta, _input_data, _target_data)) super(__class__, self)._extract_lazy((_data, _meta, _input_data, _target_data))
def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]:
"""
Adjust setup of transformation because filtered data will have negative values which is not compatible with
the log transformation. Therefore, replace all log transformation methods by a default standardization. This is
only applied on input side.
"""
transformation = DataHandlerSingleStation.setup_transformation(self, transformation)
if transformation[0] is not None:
unfiltered_option = lambda x: f"{x}/standardise" if self._add_unfiltered is True else "standardise"
for k, v in transformation[0].items():
if v["method"] in ["log", "min_max"]:
transformation[0][k]["method"] = unfiltered_option(v["method"])
return transformation
def transform(self, data_in, dim: Union[str, int] = 0, inverse: bool = False, opts=None,
transformation_dim=None):
"""
Transform data according to given transformation settings.
This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0
and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale
(`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This
method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the
internal transform method, internal mean and internal standard deviation weren't set ('inverse=True').
:param string/int dim: This param is not used for inverse transformation.
| for xarray.DataArray as string: name of dimension which should be standardised
| for pandas.DataFrame as int: axis of dimension which should be standardised
:param inverse: Switch between transformation and inverse transformation.
:return: xarray.DataArrays or pandas.DataFrames:
#. mean: Mean of data
#. std: Standard deviation of data
#. data: Standardised data
"""
if transformation_dim is None:
transformation_dim = self.DEFAULT_TARGET_DIM
def f(data, method="standardise", feature_range=None):
if method == "standardise":
return statistics.standardise(data, dim)
elif method == "centre":
return statistics.centre(data, dim)
elif method == "min_max":
kwargs = {"feature_range": feature_range} if feature_range is not None else {}
return statistics.min_max(data, dim, **kwargs)
elif method == "log":
return statistics.log(data, dim)
else:
raise NotImplementedError
def f_apply(data, method, **kwargs):
for k, v in kwargs.items():
if not (isinstance(v, xr.DataArray) or v is None):
_, opts = statistics.min_max(data, dim)
helper = xr.ones_like(opts['min'])
kwargs[k] = helper * v
mean = kwargs.pop('mean', None)
std = kwargs.pop('std', None)
min = kwargs.pop('min', None)
max = kwargs.pop('max', None)
feature_range = kwargs.pop('feature_range', None)
if method == "standardise":
return statistics.standardise_apply(data, mean, std), {"mean": mean, "std": std, "method": method}
elif method == "centre":
return statistics.centre_apply(data, mean), {"mean": mean, "method": method}
elif method == "min_max":
return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method,
"feature_range": feature_range}
elif method == "log":
return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method}
else:
raise NotImplementedError
opts = opts or {}
opts_updated = {}
if not inverse:
transformed_values = []
for var in data_in.variables.values:
data_var = data_in.sel(**{transformation_dim: [var]})
var_opts = opts.get(var, {})
_apply = (var_opts.get("mean", None) is not None) or (var_opts.get("min") is not None)
values, new_var_opts = locals()["f_apply" if _apply else "f"](data_var, **var_opts)
opts_updated[var] = copy.deepcopy(new_var_opts)
transformed_values.append(values)
return xr.concat(transformed_values, dim=transformation_dim), opts_updated
else:
return self.inverse_transform(data_in, opts, transformation_dim)
class DataHandlerFirFilter(DataHandlerFilter): class DataHandlerFirFilter(DataHandlerFilter):
"""Data handler using FIR filtered data.""" """Data handler using FIR filtered data."""
...@@ -399,7 +492,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -399,7 +492,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
# add unfiltered raw data # add unfiltered raw data
if self._add_unfiltered is True: if self._add_unfiltered is True:
data_raw = self.shift(self.input_data, self.time_dim, -self.window_history_size) data_raw = self.shift(self.input_data, self.time_dim, -self.window_history_size)
data_raw = data_raw.expand_dims({self.filter_dim: ["unfiltered"]}, -1) filter_dim = self.create_filter_index(add_unfiltered_index=True)[-1]
data_raw = data_raw.expand_dims({self.filter_dim: [filter_dim]}, -1)
input_data = xr.concat([input_data, data_raw], self.filter_dim) input_data = xr.concat([input_data, data_raw], self.filter_dim)
self.input_data = input_data self.input_data = input_data
...@@ -423,7 +517,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -423,7 +517,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
index = list(map(f, index.tolist())) index = list(map(f, index.tolist()))
index = list(map(lambda x: str(x) + "d", index)) + ["res"] index = list(map(lambda x: str(x) + "d", index)) + ["res"]
if self._add_unfiltered and add_unfiltered_index: if self._add_unfiltered and add_unfiltered_index:
index.append("unfiltered") index.append(self.unfiltered_name)
self.filter_dim_order = index self.filter_dim_order = index
return pd.Index(index, name=self.filter_dim) return pd.Index(index, name=self.filter_dim)
......
...@@ -384,7 +384,7 @@ class ClimateFIRFilter: ...@@ -384,7 +384,7 @@ class ClimateFIRFilter:
_end = pd.to_datetime(data.coords[time_dim].max().values).year _end = pd.to_datetime(data.coords[time_dim].max().values).year
filt_coll = [] filt_coll = []
for _year in range(_start, _end + 1): for _year in range(_start, _end + 1):
logging.info(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") logging.debug(f"{data.coords['Stations'].values[0]} ({var}): year={_year}")
time_slice = self._create_time_range_extend(_year, sampling, extend_length_history) time_slice = self._create_time_range_extend(_year, sampling, extend_length_history)
d = data.sel({var_dim: [var], time_dim: time_slice}) d = data.sel({var_dim: [var], time_dim: time_slice})
......
...@@ -5,6 +5,8 @@ import argparse ...@@ -5,6 +5,8 @@ import argparse
from mlair.workflows import DefaultWorkflow from mlair.workflows import DefaultWorkflow
from mlair.data_handler.data_handler_mixed_sampling import DataHandlerMixedSamplingWithClimateFirFilter from mlair.data_handler.data_handler_mixed_sampling import DataHandlerMixedSamplingWithClimateFirFilter
from mlair.model_modules.fully_connected_networks import BranchedInputFCN as NN
stats = {'o3': 'dma8eu', 'no': 'dma8eu', 'no2': 'dma8eu', stats = {'o3': 'dma8eu', 'no': 'dma8eu', 'no2': 'dma8eu',
'relhum': 'average_values', 'u': 'average_values', 'v': 'average_values', 'relhum': 'average_values', 'u': 'average_values', 'v': 'average_values',
...@@ -19,25 +21,45 @@ data_origin = {'o3': '', 'no': '', 'no2': '', ...@@ -19,25 +21,45 @@ data_origin = {'o3': '', 'no': '', 'no2': '',
def main(parser_args): def main(parser_args):
args = dict(stations=["DEBW107", "DEBW013"], args = dict(stations=["DEBW107", "DEBW013"],
network="UBA", network="UBA",
evaluate_bootstraps=False, plot_list=[], evaluate_bootstraps=False, plot_list=["PlotDataHistogram"],
data_origin=data_origin, data_handler=DataHandlerMixedSamplingWithClimateFirFilter, data_origin=data_origin, data_handler=DataHandlerMixedSamplingWithClimateFirFilter,
interpolation_limit=(3, 1), overwrite_local_data=False, interpolation_limit=(3, 1), overwrite_local_data=False,
overwrite_lazy_data=False,
lazy_preprocessing=True, lazy_preprocessing=True,
use_multiprocessing=True, use_multiprocessing=True,
use_multiprocessing_on_debug=True, use_multiprocessing_on_debug=False,
sampling=("hourly", "daily"), sampling=("hourly", "daily"),
statistics_per_var=stats, statistics_per_var=stats,
create_new_model=True, train_model=False, epochs=1, create_new_model=True, train_model=False, epochs=1,
window_history_size=6 * 24 + 16, # window_history_size=6 * 24 + 16,
window_history_size=2 * 24 + 16,
window_history_offset=16, window_history_offset=16,
kz_filter_length=[100 * 24, 15 * 24], kz_filter_length=[100 * 24, 15 * 24],
kz_filter_iter=[4, 5], kz_filter_iter=[4, 5],
filter_cutoff_period=[7, 0.8], # filter_cutoff_period=[7, 0.8],
filter_order=[7, 2], # filter_order=[7, 2],
# filter_cutoff_period=[21, 2.7, 11/24.],
# filter_order=[42, 7, 2],
filter_cutoff_period=[3],
filter_order=[7],
# extreme_values=[1.5, 1.75, 2, 2.25, 2.5, 3],
filter_add_unfiltered=True,
start="2006-01-01", start="2006-01-01",
train_start="2006-01-01", train_start="2006-01-01",
end="2011-12-31", end="2011-12-31",
test_end="2011-12-31", test_end="2011-12-31",
model=NN,
use_filter_branches=True,
transformation={
"o3": {"method": "log"},
"no": {"method": "log"},
"no2": {"method": "log"},
"cloudcover": {"method": "min_max"},
"pblheight": {"method": "log"},
"relhum": {"method": "min_max"},
"temp": {"method": "standardise"},
"u": {"method": "standardise"},
"v": {"method": "standardise"}, }, # T1F, also apply same target transformation
**parser_args.__dict__, **parser_args.__dict__,
) )
workflow = DefaultWorkflow(**args, start_script=__file__) workflow = DefaultWorkflow(**args, start_script=__file__)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment