Skip to content
Snippets Groups Projects
Commit 558e123d authored by l.leufen's avatar l.leufen
Browse files

first dataprep and join functions

parent e801995b
No related branches found
No related tags found
2 merge requests!6updated inception model and data prep class,!4data prep class
Keras==2.2.4 Keras==2.2.4
numpy==1.15.4 numpy==1.15.4
tensorflow==1.12.0 tensorflow==1.12.0
xarray
pandas
requests
\ No newline at end of file
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import xarray as xr
import pandas as pd
import logging
import os
class DataPrep:
def __init__(self, path, network, stations, variables, **kwargs):
self.path = path
self.network = network
self.stations = stations
self.variables = variables
self.statistics_per_var = kwargs.get("statistics_per_var", None)
if self.statistics_per_var is not None:
self.load_data()
else:
self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.stations,
self.variables, **kwargs)
self.mean = None
self.std = None
self.df = None
self.history = None
self.label = None
self.kwargs = kwargs
def load_data(self):
self.check_path_and_create(self.path)
file_name = "{}{}_{}.nc".format(self.path, ''.join(self.stations), '_'.join(sorted(self.variables)))
meta_file = "{}{}_{}_meta.csv".format(self.path, ''.join(self.stations), '_'.join(sorted(self.variables)))
try:
self.data = xr.open_dataarray(file_name)
self.meta = pd.read_csv(meta_file, index_col=0)
except FileExistsError as e:
logging.warning(e)
df_all = {}
df, self.meta = Fkf.download_join(station_name=self.stations, statvar=self.statistics_per_var)
df_all[self.stations[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
self.data = xarr
# save locally as nc file
xarr.to_netcdf(path=file_name)
self.meta.to_csv(meta_file)
def __repr__(self):
return "DataPrep(path='{}', network='{}', stations={}, variables={}, **{}".format(self.path, self.network,
self.stations, self.variables,
self.kwargs)
@staticmethod
def check_path_and_create(path):
"""
:param path:
:return:
"""
try:
os.makedirs(path)
logging.info("Created path: {}".format(path))
except FileExistsError:
pass
def interpolate(self, dim=None, method='linear', limit=None, use_coordinate=True, **kwargs):
raise NotImplementedError
def restandardise(self, data, dim='variables', **kwargs):
raise NotImplementedError
def standardise(self, dim):
raise NotImplementedError
def make_history_window(self, dim, window):
raise NotImplementedError
def shift(self, dim, window):
raise NotImplementedError
def make_labels(self, dimension_name_of_target, target_variable, dimension_name_of_shift, window):
raise NotImplementedError
def history_label_nan_remove(self, dim):
raise NotImplementedError
@staticmethod
def create_indexarray(index_name, index_values):
raise NotImplementedError
__author__ = 'Felix Kleinert, Lukas Leufen' __author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-15'
import keras import keras
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, AveragePooling2D, ZeroPadding2D, Dropout, Flatten, \ from keras.layers import Input, Dense, Conv2D, MaxPooling2D, AveragePooling2D, ZeroPadding2D, Dropout, Flatten, \
......
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import requests
import json
import logging
import pandas as pd
import datetime as dt
join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
logging.basicConfig(level=logging.INFO)
def download_join(station_name, statvar):
"""
Diese Funktion liest/downloaded daten von JOIN/TOAR
Input:
param: station_name, string: Station name e.g. DEBY122
param: statvar , dict: key as variable like 'O3', values as statistics on keys like 'mean'
Output:
# df: pandas df incl. variables & statistics
"""
# make sure station_name parameter is a list
if not isinstance(station_name, list):
station_name = [station_name]
# load series information
opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]}
url = create_url(**opts)
response = requests.get(url)
station_vars = response.json()
vars_dict = {item[3].lower(): item[0] for item in station_vars}
# download all variables with given statistic
data = None
df = None
for var in _lower_list(sorted(vars_dict.keys())):
if var in statvar.keys():
logging.info('load: {}'.format(var))
# create data link
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': statvar[var],
'sampling': 'daily', 'capture': 0, 'min_data_length': 1460}
url = create_url(**opts)
# load data
response = requests.get(url)
data = response.json()
# correct namespace of statistics
stat = _correct_stat_name(statvar[var])
# store data in pandas dataframe
index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
if df is None:
df = pd.DataFrame(data[stat], index=index, columns=[var])
else:
df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
logging.debug('finished: {}'.format(var))
if data:
meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
meta.columns = station_name
return df, meta
else:
raise ValueError("No data found in JOIN.")
def _correct_stat_name(stat):
mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
return mapping.get(stat, stat)
def _lower_list(args):
for string in args:
yield string.lower()
def create_url(base, service, **kwargs):
url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items())
return url
if __name__ == "__main__":
var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
station = 'DEBW107'
download_join(station, var_all_dic)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment