Commit 50e6d0bf authored by lukas leufen's avatar lukas leufen 👻
Browse files

removed network parameter, toar_data loads from both v2 and v1 and combines...

removed network parameter, toar_data loads from both v2 and v1 and combines data, data is now in local time (without DST), data handler now raises error when no data intersection is available
parent ee34c882
Pipeline #104998 failed with stages
in 10 minutes and 52 seconds
......@@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3
pytest-metadata==1.11.0
pytest-sugar==0.9.4
tabulate==0.8.8
timezonefinder==5.2.0
wget==3.2
--no-binary shapely Shapely==1.7.0
......
......@@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3
pytest-metadata==1.11.0
pytest-sugar==0.9.4
tabulate==0.8.8
timezonefinder==5.2.0
wget==3.2
--no-binary shapely Shapely==1.7.0
......
......@@ -9,7 +9,6 @@ DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
DEFAULT_NETWORK = "AIRBASE"
DEFAULT_STATION_TYPE = "background"
DEFAULT_VARIABLES = DEFAULT_VAR_ALL_DICT.keys()
DEFAULT_START = "1997-01-01"
......
......@@ -63,8 +63,8 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation):
vars = [self.variables, self.target_var]
stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind])
data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind],
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
self.station_type, self.store_data_locally, self.data_origin, self.start,
self.end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind],
limit=self.interpolation_limit[ind], sampling=self.sampling[ind])
......@@ -147,8 +147,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi
stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind])
data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind],
self.station_type, self.network, self.store_data_locally, self.data_origin,
start, end)
self.station_type, self.store_data_locally, self.data_origin, start, end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind],
limit=self.interpolation_limit[ind], sampling=self.sampling[ind])
return data
......
......@@ -40,7 +40,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
is 0.
"""
DEFAULT_STATION_TYPE = "background"
DEFAULT_NETWORK = "AIRBASE"
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
......@@ -59,16 +58,16 @@ class DataHandlerSingleStation(AbstractDataHandler):
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane",
"so2", "toluene"]
_hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim",
"target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset",
"window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"]
_hash = ["station", "statistics_per_var", "data_origin", "station_type", "sampling", "target_dim", "target_var",
"time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time",
"interpolation_limit", "interpolation_method", "variables", "window_history_end"]
def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE,
network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING,
target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM,
iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM,
window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET,
window_history_end=DEFAULT_WINDOW_HISTORY_END, window_lead_time=DEFAULT_WINDOW_LEAD_TIME,
sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM,
target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM,
window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE,
window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, window_history_end=DEFAULT_WINDOW_HISTORY_END,
window_lead_time=DEFAULT_WINDOW_LEAD_TIME,
interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT,
interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD,
overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True,
......@@ -89,7 +88,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
self._transformation = self.setup_transformation(transformation)
self.station_type = station_type
self.network = network
self.sampling = sampling
self.target_dim = target_dim
self.target_var = target_var
......@@ -141,9 +139,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
return self._data.shape, self.get_X().shape, self.get_Y().shape
def __repr__(self):
return f"StationPrep(station={self.station}, data_path='{self.path}', " \
f"statistics_per_var={self.statistics_per_var}, " \
f"station_type='{self.station_type}', network='{self.network}', " \
return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \
f"statistics_per_var={self.statistics_per_var}, station_type='{self.station_type}', " \
f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \
f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \
f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \
......@@ -170,8 +167,12 @@ class DataHandlerSingleStation(AbstractDataHandler):
return self.get_transposed_label()
def get_coordinates(self):
coords = self.meta.loc[["station_lon", "station_lat"]].astype(float)
return coords.rename(index={"station_lon": "lon", "station_lat": "lat"}).to_dict()[str(self)]
try:
coords = self.meta.loc[["station_lon", "station_lat"]].astype(float)
coords = coords.rename(index={"station_lon": "lon", "station_lat": "lat"})
except KeyError:
coords = self.meta.loc[["lon", "lat"]].astype(float)
return coords.to_dict()[str(self)]
def call_transform(self, inverse=False):
opts_input = self._transformation[0]
......@@ -302,7 +303,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
def make_input_target(self):
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.station_type, self.store_data_locally, self.data_origin,
self.start, self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit, sampling=self.sampling)
......@@ -321,8 +322,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.make_observation(self.target_dim, self.target_var, self.time_dim)
self.remove_nan(self.time_dim)
def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None,
store_data_locally=False, data_origin: Dict = None, start=None, end=None):
def load_data(self, path, station, statistics_per_var, sampling, station_type=None, store_data_locally=False,
data_origin: Dict = None, start=None, end=None):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
......@@ -340,9 +341,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
if os.path.exists(meta_file):
os.remove(meta_file)
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
station_type=station_type, store_data_locally=store_data_locally,
data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
logging.debug(f"loaded new data")
else:
......@@ -350,26 +350,24 @@ class DataHandlerSingleStation(AbstractDataHandler):
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta(meta, station, station_type, network, data_origin)
self.check_station_meta(meta, station, station_type, data_origin)
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data")
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
station_type=station_type, store_data_locally=store_data_locally,
data_origin=data_origin, time_dim=self.time_dim,
target_dim=self.target_dim, iter_dim=self.iter_dim)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data, start=start, end=end)
data = self.check_for_negative_concentrations(data)
return data, meta
def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling,
station_type=None, network=None, store_data_locally=True, data_origin: Dict = None,
time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) \
-> [xr.DataArray, pd.DataFrame]:
def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, station_type=None,
store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM,
target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface or load local era5 data.
......@@ -382,8 +380,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
:return: downloaded data and its meta data
"""
df_all = {}
df_era5, df_toar, df_join = None, None, None
meta_era5, meta_toar, meta_join = None, None, None
df_era5, df_toar = None, None
meta_era5, meta_toar = None, None
if data_origin is not None:
era5_origin = filter_dict_by_value(data_origin, "era5", True)
era5_stats = select_from_dict(statistics_per_var, era5_origin.keys())
......@@ -401,23 +399,16 @@ class DataHandlerSingleStation(AbstractDataHandler):
df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats,
sampling=sampling, data_origin=era5_origin)
if toar_origin is None or len(toar_stats) > 0:
# load combined ata from toar-data (v2 & v1)
# load combined data from toar-data (v2 & v1)
df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats,
sampling=sampling, data_origin=toar_origin,
station_type=station_type)
# # load data from toar-data (v2)
# df_toar, meta_toar = toar_data.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin)
#
# # load join data (toar-data v1)
# df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling,
# station_type=station_type, data_origin=toar_origin)
#
# # fill-up toar-data with join data
# a = 1
df = pd.concat([df_era5, df_toar], axis=1, sort=True)
meta = meta_era5 if meta_era5 is not None else meta_toar
if meta_era5 is not None and meta_toar is not None:
meta = meta_era5.combine_first(meta_toar)
else:
meta = meta_era5 if meta_era5 is not None else meta_toar
meta.loc["data_origin"] = str(data_origin)
df_all[station[0]] = df
......@@ -431,16 +422,16 @@ class DataHandlerSingleStation(AbstractDataHandler):
return xarr, meta
@staticmethod
def check_station_meta(meta, station, station_type, network, data_origin):
def check_station_meta(meta, station, station_type, data_origin):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if station_type is not None:
check_dict = {"station_type": station_type, "network_name": network, "data_origin": str(data_origin)}
check_dict = {"station_type": station_type, "type": station_type, "data_origin": str(data_origin)}
for (k, v) in check_dict.items():
if v is None or k not in meta:
if v is None or k not in meta.index:
continue
if meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
......
......@@ -68,8 +68,8 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation):
def make_input_target(self):
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
self.station_type, self.store_data_locally, self.data_origin, self.start,
self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
self.set_inputs_and_targets()
......
......@@ -168,7 +168,7 @@ class DefaultDataHandler(AbstractDataHandler):
dim = self.time_dim
intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original))
if len(intersect) < max(self.min_length, 1):
X, Y = None, None
raise ValueError(f"There is no intersection of X.")
else:
X = list(map(lambda x: x.sel({dim: intersect}), X_original))
Y = Y_original.sel({dim: intersect})
......@@ -205,10 +205,6 @@ class DefaultDataHandler(AbstractDataHandler):
if True only extract values larger than extreme_values
:param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime
"""
# check if X or Y is None
if (self._X is None) or (self._Y is None):
logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes")
return
if extreme_values is None:
logging.debug(f"No extreme values given, skip multiply extremes")
self._X_extreme, self._Y_extreme = self._X, self._Y
......
......@@ -11,7 +11,7 @@ import xarray as xr
from mlair import helpers
from mlair.configuration.era5_settings import era5_settings
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data
from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone
from mlair.helpers.data_sources.toar_data import EmptyQueryResult
from mlair.helpers.meteo import relative_humidity_from_dewpoint
......@@ -56,6 +56,9 @@ def load_era5(station_name, stat_var, sampling, data_origin):
else:
station_data = station_data[stat_var]
# convert to local timezone
station_data = correct_timezone(station_data, station_meta, sampling)
variable_meta = _emulate_meta_data(station_data)
meta = combine_meta_data(station_meta, variable_meta)
meta = pd.DataFrame.from_dict(meta, orient='index')
......
......@@ -10,7 +10,7 @@ import pandas as pd
from mlair import helpers
from mlair.configuration.join_settings import join_settings
from mlair.helpers.data_sources import toar_data
from mlair.helpers.data_sources import toar_data, toar_data_v2
# join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
......@@ -59,6 +59,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
# download all variables with given statistic
data = None
df = None
meta = {}
logging.info(f"load data for {station_name[0]} from JOIN")
for var in _lower_list(sorted(vars_dict.keys())):
if var in stat_var.keys():
......@@ -83,17 +84,53 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
# store data in pandas dataframe
df = _save_to_pandas(df, data, stat, var)
meta[var] = _correct_meta(data["metadata"])
logging.debug('finished: {}'.format(var))
if data:
meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
# load station meta using toar-data v2 API and convert to local timezone
meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta")
station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers)
df = toar_data_v2.correct_timezone(df, station_meta, sampling)
# create meta data
meta = toar_data_v2.combine_meta_data(station_meta, meta)
meta = pd.DataFrame.from_dict(meta, orient='index')
meta.columns = station_name
return df, meta
else:
raise toar_data.EmptyQueryResult("No data found in JOIN.")
def _correct_meta(meta):
meta_out = {}
for k, v in meta.items():
if k.startswith("station"):
_k = k.split("_", 1)[1]
_d = meta_out.get("station", {})
_d[_k] = v
meta_out["station"] = _d
elif k.startswith("parameter"):
_k = k.split("_", 1)[1]
_d = meta_out.get("variable", {})
_d[_k] = v
meta_out["variable"] = _d
elif k == "network_name":
if v == "AIRBASE":
_d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"}
elif v == "UBA":
_d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"}
else:
_d = {"name": v}
meta_out["roles"] = [{"contact": {"organisation": _d}}]
elif k in ["google_resolution", "numid"]:
continue
else:
meta_out[k] = v
return meta_out
def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]:
"""
Split given dict into network and data origin.
......@@ -157,24 +194,6 @@ def correct_data_format(data):
return formatted
def load_meta_data(station_name: List[str], station_type: str_or_none, network_name: str_or_none,
join_url_base: str, headers: Dict) -> [Dict, Dict]:
opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name, "as_dict": "true",
"columns": "station_id,network_name,station_local_id,station_type,station_type_of_area,station_category,"
"station_name,station_country,station_state,station_lon,station_lat,station_alt,"
"station_timezone,station_nightlight_5km,station_climatic_zone,station_wheat_production,"
"station_rice_production,station_nox_emissions,station_omi_no2_column,station_toar_category,"
"station_htap_region,station_reported_alt,station_alt_flag,station_coordinate_status,"
"station_google_alt,station_etopo_alt,station_etopo_min_alt_5km,station_etopo_relative_alt,"
"station_dominant_landcover,station_landcover_description,station_max_nightlight_25km,"
"station_max_population_density_25km,station_nightlight_1km,station_population_density,"
"google_resolution,station_comments,station_max_population_density_5km"}
if network_name is None:
opts["columns"] = opts["columns"].replace(",network_name", "")
return toar_data.get_data(opts, headers)[-1]
def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none,
join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]:
"""
......
......@@ -9,6 +9,7 @@ from . import join, toar_data_v2
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import pandas as pd
class EmptyQueryResult(Exception):
......@@ -29,9 +30,11 @@ def create_url(base: str, service: str, param_id: Union[str, int, None] = None,
:return: combined url as string
"""
if not base.endswith("/"):
base += "/"
url = f"{base}{service}"
url = f"{base}"
if not url.endswith("/"):
url += "/"
if service is not None:
url = f"{url}{service}"
if not url.endswith("/"):
url += "/"
if param_id is not None:
......@@ -79,11 +82,19 @@ def download_toar(station, toar_stats, sampling, data_origin, station_type=None
df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin)
# load join data (toar-data v1)
df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling,
station_type=station_type, data_origin=data_origin)
return df_toar
def merge_toar_join(df_toar, df_join):
start_date = min([df_toar.index.min(), df_join.index.min()])
\ No newline at end of file
df_join, _ = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling,
station_type=station_type, data_origin=data_origin)
# merge both data sources with priority on toar-data v2
df_merged = merge_toar_join(df_toar, df_join, sampling)
return df_merged, meta_toar
def merge_toar_join(df_toar, df_join, sampling):
start_date = min([df_toar.index.min(), df_join.index.min()])
end_date = max([df_toar.index.max(), df_join.index.max()])
freq = {"hourly": "1H", "daily": "1d"}.get(sampling)
full_time = pd.date_range(start_date, end_date, freq=freq)
full_data = df_toar.reindex(full_time)
full_data.update(df_join, overwrite=False)
return full_data
......@@ -8,6 +8,8 @@ from typing import Union, List, Dict
from io import StringIO
import pandas as pd
import pytz
from timezonefinder import TimezoneFinder
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers import to_list
......@@ -65,10 +67,13 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict,
data_dict = {}
for var, meta in timeseries_meta.items():
logging.debug(f"load {var}")
meta, opts = prepare_meta(meta, sampling, stat_var, var)
data_var = load_timeseries_data([meta[0]], data_url_base, opts, headers)[0]
data_dict[var] = data_var
meta_and_opts = prepare_meta(meta, sampling, stat_var, var)
data_var = []
for var_meta, opts in meta_and_opts:
data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling))
data_dict[var] = merge_data(*data_var, sampling=sampling)
data = pd.DataFrame.from_dict(data_dict)
data = correct_timezone(data, station_meta, sampling)
meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()})
meta = pd.DataFrame.from_dict(meta, orient='index')
......@@ -76,15 +81,52 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict,
return data, meta
def merge_data(*args, sampling="hourly"):
start_date = min(map(lambda x: x.index.min(), args))
end_date = max(map(lambda x: x.index.max(), args))
freq = {"hourly": "1H", "daily": "1d"}.get(sampling)
full_time = pd.date_range(start_date, end_date, freq=freq)
full_data = args[0].reindex(full_time)
if not isinstance(full_data, pd.DataFrame):
full_data = full_data.to_frame()
for d in args[1:]:
full_data.update(d, overwrite=False)
return full_data.squeeze()
def correct_timezone(data, meta, sampling):
"""
Extract timezone information and convert data index to this timezone.
Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all
other cases, it returns just the given data without any change. This method expects date index of data to be in UTC.
Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps.
"""
if sampling == "hourly":
tz_info = meta.get("timezone", "UTC")
try:
tz = pytz.timezone(tz_info)
except pytz.exceptions.UnknownTimeZoneError as e:
lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"]
tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat))
index = data.index
index = index.tz_localize(None)
utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0])
data.index = index + utc_offset
return data
def prepare_meta(meta, sampling, stat_var, var):
meta = meta[0]
opts = {}
if sampling == "daily":
opts["timeseries_id"] = meta.pop("id")
meta["id"] = None
opts["names"] = stat_var[var]
opts["sampling"] = sampling
return [meta], opts
out = []
for m in meta:
opts = {}
if sampling == "daily":
opts["timeseries_id"] = m.pop("id")
m["id"] = None
opts["names"] = stat_var[var]
opts["sampling"] = sampling
out.append(([m], opts))
return out
def combine_meta_data(station_meta, timeseries_meta):
......@@ -120,16 +162,18 @@ def combine_meta_data(station_meta, timeseries_meta):
return meta
def load_timeseries_data(timeseries_meta, url_base, opts, headers):
def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling):
coll = []
for meta in timeseries_meta:
series_id = meta["id"]
# opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
if sampling != "hourly":
opts["service"] = None
res = get_data(opts, headers, as_json=False)
data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True,
infer_datetime_format=True)
data = data["value"].rename(meta["variable"]["name"])
data = data[opts.get("names", "value")].rename(meta["variable"]["name"])
coll.append(data)
return coll
......
......@@ -10,7 +10,7 @@ from dill.source import getsource
from mlair.configuration import path_config
from mlair import helpers
from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_NETWORK, DEFAULT_STATION_TYPE, \
from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_STATION_TYPE, \
DEFAULT_START, DEFAULT_END, DEFAULT_WINDOW_HISTORY_SIZE, DEFAULT_OVERWRITE_LOCAL_DATA, \
DEFAULT_HPC_LOGIN_LIST, DEFAULT_HPC_HOST_LIST, DEFAULT_CREATE_NEW_MODEL, DEFAULT_TRAIN_MODEL, \
DEFAULT_FRACTION_OF_TRAINING, DEFAULT_EXTREME_VALUES, DEFAULT_EXTREMES_ON_RIGHT_TAIL_ONLY, DEFAULT_PERMUTE_DATA, \
......
......@@ -13,6 +13,7 @@ from typing import Dict, Tuple, Union, List, Callable
import numpy as np
import pandas as pd
import xarray as xr
import datetime as dt
from mlair.configuration import path_config
from mlair.data_handler import Bootstraps, KerasIterator
......
......@@ -114,8 +114,8 @@ class PreProcessing(RunEnvironment):
+------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
"""
meta_cols = ['station_name', 'station_lon', 'station_lat', 'station_alt']