Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • esde/machine-learning/mlair
1 result
Select Git revision
Show changes
Commits on Source (20)
docs/_source/_plots/datahistogram.png

51.4 KiB

......@@ -48,7 +48,7 @@ DEFAULT_CREATE_NEW_BOOTSTRAPS = False
DEFAULT_NUMBER_OF_BOOTSTRAPS = 20
DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries",
"PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles",
"PlotAvailability", "PlotAvailabilityHistogram", "PlotSeparationOfScales"]
"PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram"]
DEFAULT_SAMPLING = "daily"
DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA",
"temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "",
......
......@@ -205,9 +205,9 @@ class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithFil
"""
window = -abs(window)
data = self.input_data
self.history = self.stride(data, dim_name_of_shift, window)
self.history = self.stride(data, dim_name_of_shift, window, offset=self.window_history_offset)
def stride(self, data: xr.DataArray, dim: str, window: int) -> xr.DataArray:
def stride(self, data: xr.DataArray, dim: str, window: int, offset: int = 0) -> xr.DataArray:
# this is just a code snippet to check the results of the kz filter
# import matplotlib
......@@ -218,12 +218,13 @@ class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithFil
time_deltas = np.round(self.time_delta(self.cutoff_period)).astype(int)
start, end = window, 1
res = []
window_array = self.create_index_array(self.window_dim, range(start, end), squeeze_dim=self.target_dim)
_range = list(map(lambda x: x + offset, range(start, end)))
window_array = self.create_index_array(self.window_dim, _range, squeeze_dim=self.target_dim)
for delta, filter_name in zip(np.append(time_deltas, 1), data.coords["filter"]):
res_filter = []
data_filter = data.sel({"filter": filter_name})
for w in range(start, end):
res_filter.append(data_filter.shift({dim: -w * delta}))
for w in _range:
res_filter.append(data_filter.shift({dim: -(w - offset) * delta - offset}))
res_filter = xr.concat(res_filter, dim=window_array).chunk()
res.append(res_filter)
res = xr.concat(res, dim="filter").compute()
......
......@@ -183,13 +183,14 @@ class DataHandlerSingleStation(AbstractDataHandler):
#. data: Standardised data
"""
def f(data, method="standardise"):
def f(data, method="standardise", feature_range=None):
if method == "standardise":
return statistics.standardise(data, dim)
elif method == "centre":
return statistics.centre(data, dim)
elif method == "min_max":
return statistics.min_max(data, dim)
kwargs = {"feature_range": feature_range} if feature_range is not None else {}
return statistics.min_max(data, dim, **kwargs)
elif method == "log":
return statistics.log(data, dim)
else:
......@@ -205,13 +206,15 @@ class DataHandlerSingleStation(AbstractDataHandler):
std = kwargs.pop('std', None)
min = kwargs.pop('min', None)
max = kwargs.pop('max', None)
feature_range = kwargs.pop('feature_range', None)
if method == "standardise":
return statistics.standardise_apply(data, mean, std), {"mean": mean, "std": std, "method": method}
elif method == "centre":
return statistics.centre_apply(data, mean), {"mean": mean, "method": method}
elif method == "min_max":
return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method}
return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method,
"feature_range": feature_range}
elif method == "log":
return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method}
else:
......@@ -658,13 +661,13 @@ class DataHandlerSingleStation(AbstractDataHandler):
current data is not transformed.
"""
def f_inverse(data, method, mean=None, std=None, min=None, max=None):
def f_inverse(data, method, mean=None, std=None, min=None, max=None, feature_range=None):
if method == "standardise":
return statistics.standardise_inverse(data, mean, std)
elif method == "centre":
return statistics.centre_inverse(data, mean)
elif method == "min_max":
return statistics.min_max_inverse(data, min, max)
return statistics.min_max_inverse(data, min, max, feature_range)
elif method == "log":
return statistics.log_inverse(data, mean, std)
else:
......
......@@ -287,6 +287,8 @@ class DefaultDataHandler(AbstractDataHandler):
old = transformation_dict[i][var].get(k, None)
new = opts.get(k)
transformation_dict[i][var][k] = new if old is None else old.combine_first(new)
if "feature_range" in opts.keys():
transformation_dict[i][var]["feature_range"] = opts.get("feature_range", None)
if multiprocessing.cpu_count() > 1 and kwargs.get("use_multiprocessing", True) is True: # parallel solution
logging.info("use parallel transformation approach")
......@@ -320,6 +322,8 @@ class DefaultDataHandler(AbstractDataHandler):
transformation_dict[i][k]["min"] = transformation[k]["min"].min(iter_dim)
if transformation[k]["max"] is not None:
transformation_dict[i][k]["max"] = transformation[k]["max"].max(iter_dim)
if "feature_range" in transformation[k].keys():
transformation_dict[i][k]["feature_range"] = transformation[k]["feature_range"]
except KeyError:
pop_list.append((i, k))
for (i, k) in pop_list:
......
......@@ -111,8 +111,15 @@ class TrackParameter:
"""
Call method of decorator.
"""
self.track(*args)
return self.__wrapped__(*args, **kwargs)
name, obj, scope = self.track(*args)
f_name = self.__wrapped__.__name__
try:
res = self.__wrapped__(*args, **kwargs)
logging.debug(f"{f_name}: {name}({scope})={res if obj is None else obj}")
except Exception as e:
logging.debug(f"{f_name}: {name}({scope})={obj}")
raise
return res
def __get__(self, instance, cls):
"""Create bound method object and supply self argument to the decorated method. <Python Cookbook, p.347>"""
......@@ -120,13 +127,13 @@ class TrackParameter:
def track(self, tracker_obj, *args):
name, obj, scope = self._decrypt_args(*args)
logging.debug(f"{self.__wrapped__.__name__}: {name}({scope})={obj}")
tracker = tracker_obj.tracker[-1]
new_entry = {"method": self.__wrapped__.__name__, "scope": scope}
if name in tracker:
tracker[name].append(new_entry)
else:
tracker[name] = [new_entry]
return name, obj, scope
@staticmethod
def _decrypt_args(*args):
......
......@@ -20,7 +20,7 @@ Data = Union[xr.DataArray, pd.DataFrame]
def apply_inverse_transformation(data: Data, method: str = "standardise", mean: Data = None, std: Data = None,
max: Data = None, min: Data = None) -> Data:
max: Data = None, min: Data = None, feature_range: Data = None) -> Data:
"""
Apply inverse transformation for given statistics.
......@@ -38,7 +38,7 @@ def apply_inverse_transformation(data: Data, method: str = "standardise", mean:
elif method == 'centre': # pragma: no branch
return centre_inverse(data, mean)
elif method == 'min_max': # pragma: no branch
return min_max_inverse(data, min, max)
return min_max_inverse(data, min, max, feature_range)
elif method == "log":
return log_inverse(data, mean, std)
else:
......@@ -119,41 +119,45 @@ def centre_apply(data: Data, mean: Data) -> Data:
return data - mean
def min_max(data: Data, dim: Union[str, int]) -> Tuple[Data, Dict[(str, Data)]]:
def min_max(data: Data, dim: Union[str, int], feature_range: Tuple = (0, 1)) -> Tuple[Data, Dict[(str, Data)]]:
"""
Apply min/max scaling using (x - x_min) / (x_max - x_min). Returned data is in interval [0, 1].
:param data: data to transform
:param dim: name (xarray) or axis (pandas) of dimension which should be centred
:param feature_range: scale data to any interval given in feature range. Default is scaling on interval [0, 1].
:return: transformed data, and dictionary with keys method, min, and max
"""
d_max = data.max(dim)
d_min = data.min(dim)
return (data - d_min) / (d_max - d_min), {"min": d_min, "max": d_max, "method": "min_max"}
d_scaled = (data - d_min) / (d_max - d_min) * (max(feature_range) - min(feature_range)) + min(feature_range)
return d_scaled, {"min": d_min, "max": d_max, "method": "min_max", "feature_range": feature_range}
def min_max_inverse(data: Data, min: Data, max: Data) -> Data:
def min_max_inverse(data: Data, _min: Data, _max: Data, feature_range: Tuple = (0, 1)) -> Data:
"""
Apply inverse transformation of `min_max` scaling.
:param data: data to apply inverse scaling
:param min: minimum value to use for min/max scaling
:param max: maximum value to use for min/max scaling
:param _min: minimum value to use for min/max scaling
:param _max: maximum value to use for min/max scaling
:param feature_range: scale data to any interval given in feature range. Default is scaling on interval [0, 1].
:return: inverted min/max scaled data
"""
return data * (max - min) + min
return (data - min(feature_range)) / (max(feature_range) - min(feature_range)) * (_max - _min) + _min
def min_max_apply(data: Data, min: Data, max: Data) -> Data:
def min_max_apply(data: Data, _min: Data, _max: Data, feature_range: Data = (0, 1)) -> Data:
"""
Apply `min_max` scaling with given minimum and maximum.
:param data: data to apply scaling
:param min: minimum value to use for min/max scaling
:param max: maximum value to use for min/max scaling
:param _min: minimum value to use for min/max scaling
:param _max: maximum value to use for min/max scaling
:param feature_range: scale data to any interval given in feature range. Default is scaling on interval [0, 1].
:return: min/max scaled data
"""
return (data - min) / (max - min)
return (data - _min) / (_max - _min) * (max(feature_range) - min(feature_range)) + min(feature_range)
def log(data: Data, dim: Union[str, int]) -> Tuple[Data, Dict[(str, Data)]]:
......
......@@ -444,6 +444,134 @@ class PlotAvailabilityHistogram(AbstractPlotClass): # pragma: no cover
plt.tight_layout()
@TimeTrackingWrapper
class PlotDataHistogram(AbstractPlotClass): # pragma: no cover
"""
Plot histogram on transformed input and target data. This data is the same that the model sees during training. No
plots are create for the original values space (raw / unformatted data). This plot method will create a histogram
for input and target each comparing the subsets train, val and test, as well as a distinct one for the three
subsets.
.. image:: ../../../../../_source/_plots/datahistogram.png
:width: 400
"""
def __init__(self, generators: Dict[str, DataCollection], plot_folder: str = ".", plot_name="histogram",
variables_dim="variables", time_dim="datetime", window_dim="window"):
super().__init__(plot_folder, plot_name)
self.variables_dim = variables_dim
self.time_dim = time_dim
self.window_dim = window_dim
self.inputs, self.targets = self._get_inputs_targets(generators, self.variables_dim)
self.bins = {}
self.interval_width = {}
self.bin_edges = {}
# input plots
self._calculate_hist(generators, self.inputs, input_data=True)
for subset in generators.keys():
self._plot(add_name="input", subset=subset)
self._plot_combined(add_name="input")
# target plots
self._calculate_hist(generators, self.targets, input_data=False)
for subset in generators.keys():
self._plot(add_name="target", subset=subset)
self._plot_combined(add_name="target")
@staticmethod
def _get_inputs_targets(gens, dim):
k = list(gens.keys())[0]
gen = gens[k][0]
inputs = to_list(gen.get_X(as_numpy=False)[0].coords[dim].values.tolist())
targets = to_list(gen.get_Y(as_numpy=False).coords[dim].values.tolist())
return inputs, targets
def _calculate_hist(self, generators, variables, input_data=True):
n_bins = 100
for set_type, generator in generators.items():
tmp_bins = {}
tmp_edges = {}
end = {}
start = {}
f = lambda x: x.get_X(as_numpy=False)[0] if input_data is True else x.get_Y(as_numpy=False)
for gen in generator:
w = min(abs(f(gen).coords[self.window_dim].values))
data = f(gen).sel({self.window_dim: w})
res, _, g_edges = f_proc_hist(data, variables, n_bins, self.variables_dim)
for var in variables:
b = tmp_bins.get(var, [])
b.append(res[var])
tmp_bins[var] = b
e = tmp_edges.get(var, [])
e.append(g_edges[var])
tmp_edges[var] = e
end[var] = max([end.get(var, g_edges[var].max()), g_edges[var].max()])
start[var] = min([start.get(var, g_edges[var].min()), g_edges[var].min()])
# interpolate and aggregate
bins = {}
edges = {}
interval_width = {}
for var in variables:
bin_edges = np.linspace(start[var], end[var], n_bins + 1)
interval_width[var] = bin_edges[1] - bin_edges[0]
for i, e in enumerate(tmp_bins[var]):
bins_interp = np.interp(bin_edges[:-1], tmp_edges[var][i][:-1], e, left=0, right=0)
bins[var] = bins.get(var, np.zeros(n_bins)) + bins_interp
edges[var] = bin_edges
self.bins[set_type] = bins
self.interval_width[set_type] = interval_width
self.bin_edges[set_type] = edges
def _plot(self, add_name, subset):
plot_path = os.path.join(os.path.abspath(self.plot_folder), f"{self.plot_name}_{subset}_{add_name}.pdf")
pdf_pages = matplotlib.backends.backend_pdf.PdfPages(plot_path)
bins = self.bins[subset]
bin_edges = self.bin_edges[subset]
interval_width = self.interval_width[subset]
colors = self.get_dataset_colors()
for var in bins.keys():
fig, ax = plt.subplots()
hist_var = bins[var]
n_var = sum(hist_var)
weights = hist_var / (interval_width[var] * n_var)
ax.hist(bin_edges[var][:-1], bin_edges[var], weights=weights, color=colors[subset])
ax.set_ylabel("probability density")
ax.set_xlabel(f"values")
ax.set_title(f"histogram {var} ({subset}, n={int(n_var)})")
pdf_pages.savefig()
# close all open figures / plots
pdf_pages.close()
plt.close('all')
def _plot_combined(self, add_name):
plot_path = os.path.join(os.path.abspath(self.plot_folder), f"{self.plot_name}_{add_name}.pdf")
pdf_pages = matplotlib.backends.backend_pdf.PdfPages(plot_path)
variables = self.bins[list(self.bins.keys())[0]].keys()
colors = self.get_dataset_colors()
for var in variables:
fig, ax = plt.subplots()
for subset in self.bins.keys():
hist_var = self.bins[subset][var]
interval_width = self.interval_width[subset][var]
bin_edges = self.bin_edges[subset][var]
n_var = sum(hist_var)
weights = hist_var / (interval_width * n_var)
ax.plot(bin_edges[:-1] + 0.5 * interval_width, weights, label=f"{subset}",
c=colors[subset])
ax.set_ylabel("probability density")
ax.set_xlabel("values")
ax.legend(loc="upper right")
ax.set_title(f"histogram {var}")
pdf_pages.savefig()
# close all open figures / plots
pdf_pages.close()
plt.close('all')
@TimeTrackingWrapper
class PlotPeriodogram(AbstractPlotClass): # pragma: no cover
"""
Create Lomb-Scargle periodogram in raw input and target data. The Lomb-Scargle version can deal with missing values.
......@@ -698,14 +826,14 @@ class PlotPeriodogram(AbstractPlotClass): # pragma: no cover
plt.close('all')
def f_proc(var, d_var):
def f_proc(var, d_var): # pragma: no cover
var_str = str(var)
t = (d_var.datetime - d_var.datetime[0]).astype("timedelta64[h]").values / np.timedelta64(1, "D")
f, pgram = LombScargle(t, d_var.values.flatten(), nterms=1).autopower()
return var_str, f, pgram
def f_proc_2(g, m, pos, variables_dim, time_dim):
def f_proc_2(g, m, pos, variables_dim, time_dim): # pragma: no cover
raw_data_single = dict()
if m == 0:
d = g.id_class._data
......@@ -719,3 +847,14 @@ def f_proc_2(g, m, pos, variables_dim, time_dim):
var_str, f, pgram = f_proc(var, d_var)
raw_data_single[var_str] = [(f, pgram)]
return raw_data_single
def f_proc_hist(data, variables, n_bins, variables_dim): # pragma: no cover
res = {}
bin_edges = {}
interval_width = {}
for var in variables:
d = data.sel({variables_dim: var}).squeeze() if len(data.shape) > 1 else data
res[var], bin_edges[var] = np.histogram(d.values, n_bins)
interval_width[var] = bin_edges[var][1] - bin_edges[var][0]
return res, interval_width, bin_edges
......@@ -22,7 +22,7 @@ from mlair.model_modules import AbstractModelClass
from mlair.plotting.postprocessing_plotting import PlotMonthlySummary, PlotClimatologicalSkillScore, \
PlotCompetitiveSkillScore, PlotTimeSeries, PlotBootstrapSkillScore, PlotConditionalQuantiles, PlotSeparationOfScales
from mlair.plotting.data_insight_plotting import PlotStationMap, PlotAvailability, PlotAvailabilityHistogram, \
PlotPeriodogram
PlotPeriodogram, PlotDataHistogram
from mlair.run_modules.run_environment import RunEnvironment
......@@ -398,6 +398,13 @@ class PostProcessing(RunEnvironment):
except Exception as e:
logging.error(f"Could not create plot PlotPeriodogram due to the following error: {e}")
try:
if "PlotDataHistogram" in plot_list:
gens = {"train": self.train_data, "val": self.val_data, "test": self.test_data}
PlotDataHistogram(gens, plot_folder=self.plot_path, time_dim=time_dim, variables_dim=target_dim)
except Exception as e:
logging.error(f"Could not create plot PlotDataHistogram due to the following error: {e}")
def calculate_test_score(self):
"""Evaluate test score of model and save locally."""
......
......@@ -5,6 +5,7 @@ __date__ = '2019-11-25'
import logging
import os
import traceback
from typing import Tuple
import multiprocessing
import requests
......@@ -337,6 +338,15 @@ def f_proc(data_handler, station, name_affix, store, **kwargs):
try:
res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs)
except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError) as e:
logging.info(f"remove station {station} because it raised an error: {e}")
formatted_lines = traceback.format_exc().splitlines()
logging.info(
f"remove station {station} because it raised an error: {e} -> {' | '.join(f_inspect_error(formatted_lines))}")
res = None
return res, station
def f_inspect_error(formatted):
for i in range(len(formatted) - 1, -1, -1):
if "mlair/mlair" not in formatted[i]:
return formatted[i - 3:i]
return formatted[-3:0]
......@@ -68,4 +68,4 @@ class TestAllDefaults:
assert DEFAULT_PLOT_LIST == ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore",
"PlotTimeSeries", "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore",
"PlotConditionalQuantiles", "PlotAvailability", "PlotAvailabilityHistogram",
"PlotSeparationOfScales"]
"PlotDataHistogram"]