Upsample "extremes" in standardised data space
Upsample "extremes" in standardised data space
To increase model performance on "extremes" it might help to upsample those values to increase the number of examples in the training data. This comes with the cost of "destroying" the true data distribution.
Necessary modifications:
-
in DataPrep
-
in __ init __ create two more entries: self.extremes_labels = None
andself.extremes_history = None
-
create method which extracts "extreme" values from labels and corresponding inputs Extraction method could for example look like this:
def multiply_extremes(self, extr_values=1., extremes_on_right_tail_only=False, timedelta=(1,'m')):
"""
This method extracts extreme values from self.labels which are defined in the argument extr_value.
One can also decide only to extract extremes on the right tail of the distribution. When extr_values is a list
of floats/ints all values larger (and smaller than negative extr_value; extraction is performed in standardised
space) than are extracted iteratively. If for example extr_values = [1.,2.] then a value of 1.5 would be
extracted once (for 0th entry in list), while a 2.5 would be extracted twice (once for each entry). Timedelta is
used to mark those extracted values by adding one min to each timestamp. As TOAR Data are hourly one can
identify those "artificial" data points later easily. Extreme inputs and labels are stored in
self.extremes_history and self.extreme_labels, respectively.
:param extr_values: int/float or list of int/float - user definition of extreme
:param extremes_on_right_tail_only: Boolean - if False also multiply values which are smaller then -extr_values
if True only extract values larger than extr_values
:param timedelta: tuple of (int, str) used as arguments for np.timedelta in order to mark extreme values on datetime
"""
# check type if inputs
if isinstance(extr_values, float) or isinstance(extr_values, int):
extr_values = [extr_values]
elif isinstance(extr_values, list):
for i in extr_values:
if not (isinstance(i, float)) or (isinstance(i, int)):
raise TypeError(f"Elements of list extr_values have to be int or float, but at least one is {type(i)}")
extr_values.sort()
else:
raise TypeError(f"extr_values has to be float, int or list of ints or floats, but is {type(extr_values)}")
for extr_value in extr_values:
# check if some extreme values are already extracted
if (self.extremes_labels is None) and (self.extremes_history is None):
# extract extremes based on occurance in labels
if extremes_on_right_tail_only:
extreme_label_idx = (self.label > extr_value).any(axis=0).values.reshape(-1,)
else:
extreme_label_idx = np.concatenate(((self.label < -extr_value).any(axis=0).values.reshape(-1, 1),
(self.label > extr_value).any(axis=0).values.reshape(-1, 1)),
axis=1).any(axis=1)
extremes_label = self.label[..., extreme_label_idx]
extremes_history = self.history[..., extreme_label_idx, :]
extremes_label.datetime.values += np.timedelta64(*timedelta)
extremes_history.datetime.values += np.timedelta64(*timedelta)
self.extremes_labels = extremes_label.squeeze('Stations').transpose('datetime', 'window')
self.extremes_history = extremes_history.transpose('datetime', 'window', 'Stations', 'variables')
else: # one extr value iteration is done already: self.extremes_labels is NOT None...
if extremes_on_right_tail_only:
extreme_label_idx = (self.extremes_labels > extr_value).any(axis=1).values.reshape(-1,)
else:
extreme_label_idx = np.concatenate(((self.extremes_labels < -extr_value).any(axis=1
).values.reshape(-1, 1),
(self.extremes_labels > extr_value).any(axis=1
).values.reshape(-1, 1)
), axis=1).any(axis=1)
# check on existing extracted extremes to minimise computational costs for comparison
extremes_label = self.extremes_labels[extreme_label_idx, ...]
extremes_history = self.extremes_history[extreme_label_idx,...]
extremes_label.datetime.values += np.timedelta64(*timedelta)
extremes_history.datetime.values += np.timedelta64(*timedelta)
self.extremes_labels = xr.concat([self.extremes_labels,
extremes_label],
dim='datetime')
self.extremes_history = xr.concat([self.extremes_history,
extremes_history],
dim='datetime')
-
in DataGenerator
-
in __ init __ add argument or kwarg extr_values
-
in get_data_generator add call of DataPrep.multiply_extremes() Could look like this:
extr_values = self.kwargs.get('extr_values', None)
if extr_values not None:
data_item.multiply_extremes(extr_values=self.extr_values)
-
in Distributor.distribute_on_batches
-
extract extreme_history and extreme_label for DataPrep object (np.copy())
x_extremes = np.copy(self.generator.get_data_generator(k).extremes_history)
y_extremes = np.copy(self.generator.get_data_generator(k).extremes_labels)
-
if extreme_history/extreme_labels are not None concatenate extreme_history with x_total and extreme_labels with y_total
if (x_extremes is not None) and (y_extremes is not None):
x_total = np.concatenate([x_total, x_extremes], axis=0)
x_total = np.concatenate([y_total, y_extremes], axis=0)
-
calculate number of batches as function of x/y_total instead of v
num_mini_batches = self._get_number_of_mini_batches((x_total, y_total))
-
make sure this happens before permutation (see #57 (closed)) -
in experimental_setup
-
add _set_param() for extr_values