Select Git revision
data_preparation.py
data_preparation.py 3.39 KiB
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import xarray as xr
import pandas as pd
import logging
import os
from src import join
class DataPrep:
def __init__(self, path: str, network: str, stations, variables, **kwargs):
self.path = path
self.network = network
self.stations = stations
self.variables = variables
self.statistics_per_var = kwargs.get("statistics_per_var", None)
if self.statistics_per_var is not None:
self.load_data()
else:
raise NotImplementedError
# self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.stations,
# self.variables, **kwargs)
self.mean = None
self.std = None
self.df = None
self.history = None
self.label = None
self.kwargs = kwargs
self.data = None
self.meta = None
def load_data(self):
self.check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
try:
self.data = xr.open_dataarray(file_name)
self.meta = pd.read_csv(meta_file, index_col=0)
except FileNotFoundError as e:
logging.warning(e)
df_all = {}
df, self.meta = join.download_join(station_name=self.stations, statvar=self.statistics_per_var)
df_all[self.stations[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
self.data = xarr
# save locally as nc file
xarr.to_netcdf(path=file_name)
self.meta.to_csv(meta_file)
def _set_file_name(self):
return f"{self.path}{''.join(self.stations)}_{'_'.join(sorted(self.variables))}.nc"
def _set_meta_file_name(self):
return f"{self.path}{''.join(self.stations)}_{'_'.join(sorted(self.variables))}_meta.csv"
def __repr__(self):
return f"Dataprep(path='{self.path}', network='{self.network}', stations={self.stations}, " \
f"variables={self.variables}, **{self.kwargs}"
@staticmethod
def check_path_and_create(path):
try:
os.makedirs(path)
logging.info("Created path: {}".format(path))
except FileExistsError:
pass
def interpolate(self, dim=None, method='linear', limit=None, use_coordinate=True, **kwargs):
raise NotImplementedError
def restandardise(self, data, dim='variables', **kwargs):
raise NotImplementedError
def standardise(self, dim):
raise NotImplementedError
def make_history_window(self, dim, window):
raise NotImplementedError
def shift(self, dim, window):
raise NotImplementedError
def make_labels(self, dimension_name_of_target, target_variable, dimension_name_of_shift, window):
raise NotImplementedError
def history_label_nan_remove(self, dim):
raise NotImplementedError
@staticmethod
def create_indexarray(index_name, index_values):
raise NotImplementedError
if __name__ == "__main__":
dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp)