Skip to content
Snippets Groups Projects

Resolve "release v1.2.0"

Merged Ghost User requested to merge release_v1.2.0 into master
5 files
+ 163
1
Compare changes
  • Side-by-side
  • Inline
Files
5
@@ -6,6 +6,8 @@ __date__ = '2019-11-25'
@@ -6,6 +6,8 @@ __date__ = '2019-11-25'
import logging
import logging
import os
import os
from typing import Tuple
from typing import Tuple
 
import multiprocessing
 
import requests
import numpy as np
import numpy as np
import pandas as pd
import pandas as pd
@@ -201,6 +203,50 @@ class PreProcessing(RunEnvironment):
@@ -201,6 +203,50 @@ class PreProcessing(RunEnvironment):
Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the
Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the
loading time are logged in debug mode.
loading time are logged in debug mode.
 
:return: Corrected list containing only valid station IDs.
 
"""
 
t_outer = TimeTracking()
 
logging.info(f"check valid stations started{' (%s)' % (set_name if set_name is not None else 'all')}")
 
# calculate transformation using train data
 
if set_name == "train":
 
logging.info("setup transformation using train data exclusively")
 
self.transformation(data_handler, set_stations)
 
# start station check
 
collection = DataCollection()
 
valid_stations = []
 
kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name)
 
 
if multiprocessing.cpu_count() > 1: # parallel solution
 
logging.info("use parallel validate station approach")
 
pool = multiprocessing.Pool()
 
output = [
 
pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs)
 
for station in set_stations]
 
for p in output:
 
dh, s = p.get()
 
if dh is not None:
 
collection.add(dh)
 
valid_stations.append(s)
 
else: # serial solution
 
logging.info("use serial validate station approach")
 
for station in set_stations:
 
dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs)
 
if dh is not None:
 
collection.add(dh)
 
valid_stations.append(s)
 
 
logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/"
 
f"{len(set_stations)} valid stations.")
 
return collection, valid_stations
 
 
def validate_station_old(self, data_handler: AbstractDataHandler, set_stations, set_name=None,
 
store_processed_data=True):
 
"""
 
Check if all given stations in `all_stations` are valid.
 
 
Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the
 
loading time are logged in debug mode.
 
:return: Corrected list containing only valid station IDs.
:return: Corrected list containing only valid station IDs.
"""
"""
t_outer = TimeTracking()
t_outer = TimeTracking()
@@ -231,3 +277,18 @@ class PreProcessing(RunEnvironment):
@@ -231,3 +277,18 @@ class PreProcessing(RunEnvironment):
transformation_dict = data_handler.transformation(stations, **kwargs)
transformation_dict = data_handler.transformation(stations, **kwargs)
if transformation_dict is not None:
if transformation_dict is not None:
self.data_store.set("transformation", transformation_dict)
self.data_store.set("transformation", transformation_dict)
 
 
 
def f_proc(data_handler, station, name_affix, store, **kwargs):
 
"""
 
Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and
 
therefore f_proc will return None as indication. On a successfull build, f_proc returns the built data handler and
 
the station that was used. This function must be implemented globally to work together with multiprocessing.
 
"""
 
try:
 
res = data_handler.build(station, name_affix=name_affix, store_processed_data=store,
 
**kwargs)
 
except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError) as e:
 
logging.info(f"remove station {station} because it raised an error: {e}")
 
res = None
 
return res, station
Loading