Skip to content
Snippets Groups Projects

Resolve "release v1.2.0"

Merged Ghost User requested to merge release_v1.2.0 into master
1 file
+ 23
10
Compare changes
  • Side-by-side
  • Inline
@@ -215,16 +215,24 @@ class PreProcessing(RunEnvironment):
valid_stations = []
kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name)
logging.info("-------start parallel loop------")
pool = multiprocessing.Pool(4)
output = [pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs)
for station in set_stations]
for p in output:
dh, s = p.get()
if dh is not None:
collection.add(dh)
valid_stations.append(s)
if os.cpu_count() > 1: # parallel solution
logging.info("use parallel validate station approach")
pool = multiprocessing.Pool()
output = [
pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs)
for station in set_stations]
for p in output:
dh, s = p.get()
if dh is not None:
collection.add(dh)
valid_stations.append(s)
else: # serial solution
logging.info("use serial validate station approach")
for station in set_stations:
dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs)
if dh is not None:
collection.add(dh)
valid_stations.append(s)
logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/"
f"{len(set_stations)} valid stations.")
@@ -271,6 +279,11 @@ class PreProcessing(RunEnvironment):
def f_proc(data_handler, station, name_affix, store, **kwargs):
"""
Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and
therefore f_proc will return None as indication. On a successfull build, f_proc returns the built data handler and
the station that was used. This function must be implemented globally to work together with multiprocessing.
"""
try:
res = data_handler.build(station, name_affix=name_affix, store_processed_data=store,
**kwargs)
Loading