From 558e123d1c299260c8995dadb8a23c858c73e64e Mon Sep 17 00:00:00 2001 From: "l.leufen" <l.leufen@fz-juelich.de> Date: Wed, 16 Oct 2019 14:50:31 +0200 Subject: [PATCH] first dataprep and join functions --- requirements.txt | 3 ++ src/data_preparation.py | 93 +++++++++++++++++++++++++++++++++++++++++ src/inception_model.py | 1 + src/join.py | 93 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 190 insertions(+) create mode 100644 src/data_preparation.py create mode 100644 src/join.py diff --git a/requirements.txt b/requirements.txt index 753c6e7f..3bf05cf6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ Keras==2.2.4 numpy==1.15.4 tensorflow==1.12.0 +xarray +pandas +requests \ No newline at end of file diff --git a/src/data_preparation.py b/src/data_preparation.py new file mode 100644 index 00000000..ba8180f9 --- /dev/null +++ b/src/data_preparation.py @@ -0,0 +1,93 @@ +__author__ = 'Felix Kleinert, Lukas Leufen' +__date__ = '2019-10-16' + + +import xarray as xr +import pandas as pd +import logging +import os + + +class DataPrep: + + def __init__(self, path, network, stations, variables, **kwargs): + self.path = path + self.network = network + self.stations = stations + self.variables = variables + self.statistics_per_var = kwargs.get("statistics_per_var", None) + if self.statistics_per_var is not None: + self.load_data() + else: + self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.stations, + self.variables, **kwargs) + self.mean = None + self.std = None + self.df = None + self.history = None + self.label = None + self.kwargs = kwargs + + def load_data(self): + self.check_path_and_create(self.path) + file_name = "{}{}_{}.nc".format(self.path, ''.join(self.stations), '_'.join(sorted(self.variables))) + meta_file = "{}{}_{}_meta.csv".format(self.path, ''.join(self.stations), '_'.join(sorted(self.variables))) + try: + self.data = xr.open_dataarray(file_name) + self.meta = pd.read_csv(meta_file, index_col=0) + except FileExistsError as e: + logging.warning(e) + df_all = {} + df, self.meta = Fkf.download_join(station_name=self.stations, statvar=self.statistics_per_var) + df_all[self.stations[0]] = df + # convert df_all to xarray + xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} + xarr = xr.Dataset(xarr).to_array(dim='Stations') + self.data = xarr + # save locally as nc file + xarr.to_netcdf(path=file_name) + self.meta.to_csv(meta_file) + + def __repr__(self): + return "DataPrep(path='{}', network='{}', stations={}, variables={}, **{}".format(self.path, self.network, + self.stations, self.variables, + self.kwargs) + + @staticmethod + def check_path_and_create(path): + """ + + :param path: + :return: + """ + try: + os.makedirs(path) + logging.info("Created path: {}".format(path)) + except FileExistsError: + pass + + def interpolate(self, dim=None, method='linear', limit=None, use_coordinate=True, **kwargs): + raise NotImplementedError + + def restandardise(self, data, dim='variables', **kwargs): + raise NotImplementedError + + def standardise(self, dim): + raise NotImplementedError + + def make_history_window(self, dim, window): + raise NotImplementedError + + def shift(self, dim, window): + raise NotImplementedError + + def make_labels(self, dimension_name_of_target, target_variable, dimension_name_of_shift, window): + raise NotImplementedError + + def history_label_nan_remove(self, dim): + raise NotImplementedError + + @staticmethod + def create_indexarray(index_name, index_values): + raise NotImplementedError + diff --git a/src/inception_model.py b/src/inception_model.py index 8ffbb3d5..a272b06a 100644 --- a/src/inception_model.py +++ b/src/inception_model.py @@ -1,4 +1,5 @@ __author__ = 'Felix Kleinert, Lukas Leufen' +__date__ = '2019-10-15' import keras from keras.layers import Input, Dense, Conv2D, MaxPooling2D, AveragePooling2D, ZeroPadding2D, Dropout, Flatten, \ diff --git a/src/join.py b/src/join.py new file mode 100644 index 00000000..497d642c --- /dev/null +++ b/src/join.py @@ -0,0 +1,93 @@ +__author__ = 'Felix Kleinert, Lukas Leufen' +__date__ = '2019-10-16' + + +import requests +import json +import logging +import pandas as pd +import datetime as dt + +join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' +logging.basicConfig(level=logging.INFO) + +def download_join(station_name, statvar): + + """ + Diese Funktion liest/downloaded daten von JOIN/TOAR + Input: + param: station_name, string: Station name e.g. DEBY122 + param: statvar , dict: key as variable like 'O3', values as statistics on keys like 'mean' + + Output: + # df: pandas df incl. variables & statistics + + """ + # make sure station_name parameter is a list + if not isinstance(station_name, list): + station_name = [station_name] + + # load series information + opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]} + url = create_url(**opts) + response = requests.get(url) + station_vars = response.json() + vars_dict = {item[3].lower(): item[0] for item in station_vars} + + # download all variables with given statistic + data = None + df = None + for var in _lower_list(sorted(vars_dict.keys())): + if var in statvar.keys(): + logging.info('load: {}'.format(var)) + + # create data link + opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': statvar[var], + 'sampling': 'daily', 'capture': 0, 'min_data_length': 1460} + url = create_url(**opts) + + # load data + response = requests.get(url) + data = response.json() + + # correct namespace of statistics + stat = _correct_stat_name(statvar[var]) + + # store data in pandas dataframe + index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime']) + if df is None: + df = pd.DataFrame(data[stat], index=index, columns=[var]) + else: + df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1) + logging.debug('finished: {}'.format(var)) + + if data: + meta = pd.DataFrame.from_dict(data['metadata'], orient='index') + meta.columns = station_name + return df, meta + else: + raise ValueError("No data found in JOIN.") + + +def _correct_stat_name(stat): + mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} + return mapping.get(stat, stat) + + +def _lower_list(args): + for string in args: + yield string.lower() + + +def create_url(base, service, **kwargs): + + url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items()) + return url + + +if __name__ == "__main__": + var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + station = 'DEBW107' + download_join(station, var_all_dic) -- GitLab