Skip to content
Snippets Groups Projects
Commit 3ce10ad9 authored by lukas leufen's avatar lukas leufen
Browse files

include current dev status of #140

parents 5adcc794 8ed78575
Branches
Tags
4 merge requests!136update release branch,!135Release v0.11.0,!134MLAir is decoupled from join,!118WIP: Resolve "Implement Data Preparation"
...@@ -25,6 +25,387 @@ num_or_list = Union[number, List[number]] ...@@ -25,6 +25,387 @@ num_or_list = Union[number, List[number]]
data_or_none = Union[xr.DataArray, None] data_or_none = Union[xr.DataArray, None]
class AbstractStationPrep():
def __init__(self, path, station, statistics_per_var, **kwargs):
# passed parameters
self.path = os.path.abspath(path)
self.station = helpers.to_list(station)
self.statistics_per_var = statistics_per_var
# self.target_dim = 'variable'
self.kwargs = kwargs
# internal
self.data = None
self.meta = None
self.variables = kwargs.get('variables', list(statistics_per_var.keys()))
self.history = None
self.label = None
self.observation = None
def load_data(self):
try:
self.read_data_from_disk()
except FileNotFoundError:
self.download_data()
self.load_data()
def read_data_from_disk(self):
raise NotImplementedError
def download_data(self):
raise NotImplementedError
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
Set all negative concentrations to zero.
Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/
#2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox",
"o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene".
:param data: data array containing variables to check
:param minimum: minimum value, by default this should be 0
:return: corrected data
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
used_chem_vars = list(set(chem_vars) & set(self.variables))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
def shift(self, dim: str, window: int) -> xr.DataArray:
"""
Shift data multiple times to represent history (if window <= 0) or lead time (if window > 0).
:param dim: dimension along shift is applied
:param window: number of steps to shift (corresponds to the window length)
:return: shifted data
"""
start = 1
end = 1
if window <= 0:
start = window
else:
end = window + 1
res = []
for w in range(start, end):
res.append(self.data.shift({dim: -w}))
window_array = self.create_index_array('window', range(start, end), squeeze_dim=self.target_dim)
res = xr.concat(res, dim=window_array)
return res
@staticmethod
def create_index_array(index_name: str, index_value: Iterable[int], squeeze_dim: str) -> xr.DataArray:
"""
Create an 1D xr.DataArray with given index name and value.
:param index_name: name of dimension
:param index_value: values of this dimension
:return: this array
"""
ind = pd.DataFrame({'val': index_value}, index=index_value)
# res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim=squeez/e_dim, drop=True)
res = xr.Dataset.from_dataframe(ind).to_array(squeeze_dim).rename({'index': index_name}).squeeze(
dim=squeeze_dim,
drop=True
)
res.name = index_name
return res
def _set_file_name(self):
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}.nc")
def _set_meta_file_name(self):
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv")
def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True,
**kwargs):
"""
Interpolate values according to different methods.
(Copy paste from dataarray.interpolate_na)
:param dim:
Specifies the dimension along which to interpolate.
:param method:
{'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial', 'barycentric', 'krog', 'pchip',
'spline', 'akima'}, optional
String indicating which method to use for interpolation:
- 'linear': linear interpolation (Default). Additional keyword
arguments are passed to ``numpy.interp``
- 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial': are passed to ``scipy.interpolate.interp1d``. If
method=='polynomial', the ``order`` keyword argument must also be
provided.
- 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their
respective``scipy.interpolate`` classes.
:param limit:
default None
Maximum number of consecutive NaNs to fill. Must be greater than 0
or None for no limit.
:param use_coordinate:
default True
Specifies which index to use as the x values in the interpolation
formulated as `y = f(x)`. If False, values are treated as if
eqaully-spaced along `dim`. If True, the IndexVariable `dim` is
used. If use_coordinate is a string, it specifies the name of a
coordinate variariable to use as the index.
:param kwargs:
:return: xarray.DataArray
"""
self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate,
**kwargs)
def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None:
"""
Create a xr.DataArray containing history data.
Shift the data window+1 times and return a xarray which has a new dimension 'window' containing the shifted
data. This is used to represent history in the data. Results are stored in history attribute.
:param dim_name_of_inputs: Name of dimension which contains the input variables
:param window: number of time steps to look back in history
Note: window will be treated as negative value. This should be in agreement with looking back on
a time line. Nonetheless positive values are allowed but they are converted to its negative
expression
:param dim_name_of_shift: Dimension along shift will be applied
"""
window = -abs(window)
self.history = self.shift(dim_name_of_shift, window).sel({dim_name_of_inputs: self.variables})
def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str,
window: int) -> None:
"""
Create a xr.DataArray containing labels.
Labels are defined as the consecutive target values (t+1, ...t+n) following the current time step t. Set label
attribute.
:param dim_name_of_target: Name of dimension which contains the target variable
:param target_var: Name of target variable in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
:param window: lead time of label
"""
window = abs(window)
self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var})
def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None:
"""
Create a xr.DataArray containing observations.
Observations are defined as value of the current time step t. Set observation attribute.
:param dim_name_of_target: Name of dimension which contains the observation variable
:param target_var: Name of observation variable(s) in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
"""
self.observation = self.shift(dim_name_of_shift, 0).sel({dim_name_of_target: target_var})
def remove_nan(self, dim: str) -> None:
"""
Remove all NAs slices along dim which contain nans in history, label and observation.
This is done to present only a full matrix to keras.fit. Update history, label, and observation attribute.
:param dim: dimension along the remove is performed.
"""
intersect = []
if (self.history is not None) and (self.label is not None):
non_nan_history = self.history.dropna(dim=dim)
non_nan_label = self.label.dropna(dim=dim)
non_nan_observation = self.observation.dropna(dim=dim)
intersect = reduce(np.intersect1d, (non_nan_history.coords[dim].values, non_nan_label.coords[dim].values,
non_nan_observation.coords[dim].values))
min_length = self.kwargs.get("min_length", 0)
if len(intersect) < max(min_length, 1):
self.history = None
self.label = None
self.observation = None
else:
self.history = self.history.sel({dim: intersect})
self.label = self.label.sel({dim: intersect})
self.observation = self.observation.sel({dim: intersect})
def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray:
"""
Set start and end date for slicing and execute self._slice().
:param data: data to slice
:param coord: name of axis to slice
:return: sliced data
"""
start = self.kwargs.get('start', data.coords[coord][0].values)
end = self.kwargs.get('end', data.coords[coord][-1].values)
return self._slice(data, start, end, coord)
@staticmethod
def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray:
"""
Slice through a given data_item (for example select only values of 2011).
:param data: data to slice
:param start: start date of slice
:param end: end date of slice
:param coord: name of axis to slice
:return: sliced data
"""
return data.loc[{coord: slice(str(start), str(end))}]
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
Set all negative concentrations to zero.
Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/
#2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox",
"o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene".
:param data: data array containing variables to check
:param minimum: minimum value, by default this should be 0
:return: corrected data
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
used_chem_vars = list(set(chem_vars) & set(self.variables))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
class StationPrep(AbstractStationPrep):
def __init__(self, path, station, statistics_per_var, station_type, network, sampling, target_dim, target_var,
interpolate_dim, window_history_size, window_lead_time, **kwargs):
super().__init__(path, station, statistics_per_var, **kwargs)
self.station_type = station_type
self.network = network
self.sampling = sampling
self.target_dim = target_dim
self.target_var = target_var
self.interpolate_dim = interpolate_dim
self.window_history_size = window_history_size
self.window_lead_time = window_lead_time
self.make_samples()
def get_transposed_history(self) -> xr.DataArray:
"""Return history.
:return: history with dimensions datetime, window, Stations, variables.
"""
return self.history.transpose("datetime", "window", "Stations", "variables").copy()
def get_transposed_label(self) -> xr.DataArray:
"""Return label.
:return: label with dimensions datetime, window, Stations, variables.
"""
return self.label.squeeze("Stations").transpose("datetime", "window").copy()
def get_x(self):
return self.get_transposed_history()
def get_y(self):
return self.get_transposed_label()
def make_samples(self):
self.load_data()
self.make_history_window(self.target_dim, self.window_history_size, self.interpolate_dim)
self.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
self.make_observation(self.target_dim, self.target_var, self.interpolate_dim)
self.remove_nan(self.interpolate_dim)
def read_data_from_disk(self, source_name=""):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
Data is either downloaded, if no local data is available or parameter overwrite_local_data is true. In both
cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not
set, it is assumed, that data should be saved locally.
"""
source_name = source_name if len(source_name) == 0 else f" from {source_name}"
check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
if self.kwargs.get('overwrite_local_data', False):
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}")
if os.path.exists(file_name):
os.remove(file_name)
if os.path.exists(meta_file):
os.remove(meta_file)
data, self.meta = self.download_data(file_name, meta_file)
logging.debug(f"loaded new data{source_name}")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
self.meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta()
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data{source_name}")
data, self.meta = self.download_data(file_name, meta_file)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data)
self.data = self.check_for_negative_concentrations(data)
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally
stored locally using given names for file and meta file.
:param file_name: name of file to save data to (containing full path)
:param meta_file: name of the meta data file (also containing full path)
:return: downloaded data and its meta data
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var,
station_type=self.station_type, network_name=self.network, sampling=self.sampling)
df_all[self.station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
if self.kwargs.get('store_data_locally', True):
# save locally as nc/csv file
xarr.to_netcdf(path=file_name)
meta.to_csv(meta_file)
return xarr, meta
def download_data(self, file_name, meta_file):
data, meta = self.download_data_from_join(file_name, meta_file)
return data, meta
def check_station_meta(self):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if self.station_type is not None:
check_dict = {"station_type": self.station_type, "network_name": self.network}
for (k, v) in check_dict.items():
if v is None:
continue
if self.meta.at[k, self.station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
class AbstractDataPrep(object): class AbstractDataPrep(object):
""" """
This class prepares data to be used in neural networks. This class prepares data to be used in neural networks.
...@@ -554,5 +935,13 @@ class AbstractDataPrep(object): ...@@ -554,5 +935,13 @@ class AbstractDataPrep(object):
if __name__ == "__main__": if __name__ == "__main__":
dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) # dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp) # print(dp)
statistics_per_var = {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'}
sp = StationPrep(path='/home/felix/PycharmProjects/mlt_new/data/', station='DEBY122',
statistics_per_var=statistics_per_var, station_type='background',
network='UBA', sampling='daily', target_dim='variables', target_var='o3',
interpolate_dim='datetime', window_history_size=7, window_lead_time=3)
sp.load_data()
sp.download_data('newfile.nc', 'new_meta.csv')
print(sp)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment