Skip to content
Snippets Groups Projects
Commit e70b66cb authored by lukas leufen's avatar lukas leufen
Browse files

include current dev from #141, still transform functionality missing

parents f2ba3a15 f5731489
No related branches found
No related tags found
5 merge requests!136update release branch,!135Release v0.11.0,!134MLAir is decoupled from join,!118WIP: Resolve "Implement Data Preparation",!116WIP: Resolve "Implement Station Preparation"
Pipeline #40761 passed
__author__ = 'Lukas Leufen'
__date__ = '2020-07-08'
from src.helpers import to_list, remove_items
import numpy as np
import xarray as xr
import pickle
import os
import pandas as pd
import datetime as dt
import shutil
from typing import Union, List, Tuple
import logging
from functools import reduce
number = Union[float, int]
num_or_list = Union[number, List[number]]
class DummyDataSingleStation: # pragma: no cover
def __init__(self, name, number_of_samples=None):
self.name = name
self.number_of_samples = number_of_samples if number_of_samples is not None else np.random.randint(100, 150)
def get_X(self):
X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 5)) # samples, window, variables
datelist = pd.date_range(dt.datetime.today().date(), periods=self.number_of_samples, freq="H").tolist()
return xr.DataArray(X1, dims=['datetime', 'window', 'variables'], coords={"datetime": datelist,
"window": range(14),
"variables": range(5)})
def get_Y(self):
Y1 = np.round(0.5 * np.random.randn(self.number_of_samples, 5, 1), 1) # samples, window, variables
datelist = pd.date_range(dt.datetime.today().date(), periods=self.number_of_samples, freq="H").tolist()
return xr.DataArray(Y1, dims=['datetime', 'window', 'variables'], coords={"datetime": datelist,
"window": range(5),
"variables": range(1)})
def __str__(self):
return self.name
class DataPreparation:
def __init__(self, id_class, interpolate_dim: str, store_path, neighbors=None, min_length=0,
extreme_values: num_or_list = None,extremes_on_right_tail_only: bool = False,):
self.id_class = id_class
self.neighbors = to_list(neighbors) if neighbors is not None else []
self.interpolate_dim = interpolate_dim
self.min_length = min_length
self._X = None
self._Y = None
self._X_extreme = None
self._Y_extreme = None
self._save_file = os.path.join(store_path, f"data_preparation_{str(self.id_class)}.pickle")
self._collection = []
self._create_collection()
self.harmonise_X()
self.multiply_extremes(extreme_values, extremes_on_right_tail_only, dim=self.interpolate_dim)
self._store(fresh_store=True)
def _reset_data(self):
self._X, self._Y, self._X_extreme, self._Y_extreme = None, None, None, None
def _cleanup(self):
directory = os.path.dirname(self._save_file)
if os.path.exists(directory) is False:
os.makedirs(directory)
if os.path.exists(self._save_file):
shutil.rmtree(self._save_file, ignore_errors=True)
def _store(self, fresh_store=False):
self._cleanup() if fresh_store is True else None
data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme}
with open(self._save_file, "wb") as f:
pickle.dump(data, f)
logging.debug(f"save pickle data to {self._save_file}")
self._reset_data()
def _load(self):
try:
with open(self._save_file, "rb") as f:
data = pickle.load(f)
logging.debug(f"load pickle data from {self._save_file}")
self._X, self._Y = data["X"], data["Y"]
self._X_extreme, self._Y_extreme = data["X_extreme"], data["Y_extreme"]
except FileNotFoundError:
pass
def get_data(self, upsampling=False, as_numpy=True):
self._load()
X = self.get_X(upsampling, as_numpy)
Y = self.get_Y(upsampling, as_numpy)
self._reset_data()
return X, Y
def _create_collection(self):
for data_class in [self.id_class] + self.neighbors:
self._collection.append(data_class)
def __repr__(self):
return ";".join(list(map(lambda x: str(x), self._collection)))
def get_X_original(self):
X = []
for data in self._collection:
X.append(data.get_X())
return X
def get_Y_original(self):
Y = self._collection[0].get_Y()
return Y
@staticmethod
def _to_numpy(d):
return list(map(lambda x: np.copy(x), d))
def get_X(self, upsamling=False, as_numpy=True):
no_data = (self._X is None)
self._load() if no_data is True else None
X = self._X if upsamling is False else self._X_extreme
self._reset_data() if no_data is True else None
return self._to_numpy(X) if as_numpy is True else X
def get_Y(self, upsamling=False, as_numpy=True):
no_data = (self._Y is None)
self._load() if no_data is True else None
Y = self._Y if upsamling is False else self._Y_extreme
self._reset_data() if no_data is True else None
return self._to_numpy([Y]) if as_numpy is True else Y
def harmonise_X(self):
X_original, Y_original = self.get_X_original(), self.get_Y_original()
dim = self.interpolate_dim
intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original))
if len(intersect) < max(self.min_length, 1):
X, Y = None, None
else:
X = list(map(lambda x: x.sel({dim: intersect}), X_original))
Y = Y_original.sel({dim: intersect})
self._X, self._Y = X, Y
def multiply_extremes(self, extreme_values: num_or_list = 1., extremes_on_right_tail_only: bool = False,
timedelta: Tuple[int, str] = (1, 'm'), dim="datetime"):
"""
Multiply extremes.
This method extracts extreme values from self.labels which are defined in the argument extreme_values. One can
also decide only to extract extremes on the right tail of the distribution. When extreme_values is a list of
floats/ints all values larger (and smaller than negative extreme_values; extraction is performed in standardised
space) than are extracted iteratively. If for example extreme_values = [1.,2.] then a value of 1.5 would be
extracted once (for 0th entry in list), while a 2.5 would be extracted twice (once for each entry). Timedelta is
used to mark those extracted values by adding one min to each timestamp. As TOAR Data are hourly one can
identify those "artificial" data points later easily. Extreme inputs and labels are stored in
self.extremes_history and self.extreme_labels, respectively.
:param extreme_values: user definition of extreme
:param extremes_on_right_tail_only: if False also multiply values which are smaller then -extreme_values,
if True only extract values larger than extreme_values
:param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime
"""
# check if X or Y is None
if (self._X is None) or (self._Y is None):
logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes")
return
if extreme_values is None:
logging.debug(f"No extreme values given, skip multiply extremes")
self._X_extreme, self._Y_extreme = self._X, self._Y
return
# check type if inputs
extreme_values = to_list(extreme_values)
for i in extreme_values:
if not isinstance(i, number.__args__):
raise TypeError(f"Elements of list extreme_values have to be {number.__args__}, but at least element "
f"{i} is type {type(i)}")
for extr_val in sorted(extreme_values):
# check if some extreme values are already extracted
if (self._X_extreme is None) or (self._Y_extreme is None):
X = self._X
Y = self._Y
else: # one extr value iteration is done already: self.extremes_label is NOT None...
X = self._X_extreme
Y = self._Y_extreme
# extract extremes based on occurrence in labels
other_dims = remove_items(list(Y.dims), dim)
if extremes_on_right_tail_only:
extreme_idx = (Y > extr_val).any(dim=other_dims)
else:
extreme_idx = xr.concat([(Y < -extr_val).any(dim=other_dims[0]),
(Y > extr_val).any(dim=other_dims[0])],
dim=other_dims[1]).any(dim=other_dims[1])
extremes_X = list(map(lambda x: x.sel(**{dim: extreme_idx}), X))
self._add_timedelta(extremes_X, dim, timedelta)
# extremes_X = list(map(lambda x: x.coords[dim].values + np.timedelta64(*timedelta), extremes_X))
extremes_Y = Y.sel(**{dim: extreme_idx})
extremes_Y.coords[dim].values += np.timedelta64(*timedelta)
self._Y_extreme = xr.concat([Y, extremes_Y], dim=dim)
self._X_extreme = list(map(lambda x1, x2: xr.concat([x1, x2], dim=dim), X, extremes_X))
@staticmethod
def _add_timedelta(data, dim, timedelta):
for d in data:
d.coords[dim].values += np.timedelta64(*timedelta)
def run_data_prep():
data = DummyDataSingleStation("main_class")
data.get_X()
data.get_Y()
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata")
data_prep = DataPreparation(DummyDataSingleStation("main_class"), "datetime", path,
neighbors=[DummyDataSingleStation("neighbor1"), DummyDataSingleStation("neighbor2")],
extreme_values=[1., 1.2])
data_prep.get_data(upsampling=False)
def create_data_prep():
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata")
station_type = None
network = 'UBA'
sampling = 'daily'
target_dim = 'variables'
target_var = 'o3'
interpolate_dim = 'datetime'
window_history_size = 7
window_lead_time = 3
central_station = StationPrep(path, "DEBW011", {'o3': 'dma8eu', 'temp': 'maximum'}, {},station_type, network, sampling, target_dim,
target_var, interpolate_dim, window_history_size, window_lead_time)
neighbor1 = StationPrep(path, "DEBW013", {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'}, {},station_type, network, sampling, target_dim,
target_var, interpolate_dim, window_history_size, window_lead_time)
neighbor2 = StationPrep(path, "DEBW034", {'o3': 'dma8eu', 'temp': 'maximum'}, {}, station_type, network, sampling, target_dim,
target_var, interpolate_dim, window_history_size, window_lead_time)
data_prep = []
data_prep.append(DataPreparation(central_station, interpolate_dim, path, neighbors=[neighbor1, neighbor2]))
data_prep.append(DataPreparation(neighbor1, interpolate_dim, path, neighbors=[central_station, neighbor2]))
data_prep.append(DataPreparation(neighbor2, interpolate_dim, path, neighbors=[neighbor1, central_station]))
return data_prep
if __name__ == "__main__":
from src.data_handling.data_preparation import StationPrep
from src.data_handling.iterator import KerasIterator, DataCollection
data_prep = create_data_prep()
data_collection = DataCollection(data_prep)
for data in data_collection:
print(data)
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata", "keras")
keras_it = KerasIterator(data_collection, 100, path)
keras_it[2]
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment