diff --git a/.gitignore b/.gitignore index 305a5d1b9420eb62da24772fc1f4b263c1f3efe1..01ca296747666fee411aedca6fbb15a554a0bb51 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,13 @@ ehthumbs.db Thumbs.db .idea/ /venv/ +/venv*/ +/build/ + +# ignore HPC related skripts # +############################## +run_*_develgpus.bash +run_*_gpus.bash # don't check data and plot folder # #################################### diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 4cee4a9744f33c86e8802aad27125cf0e0b30f3a..13a4b38184ea3c00c9cf5990c35c6d7bdcb6225d 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -6,6 +6,8 @@ __date__ = '2019-11-25' import logging import os from typing import Tuple +import multiprocessing +import requests import numpy as np import pandas as pd @@ -201,6 +203,50 @@ class PreProcessing(RunEnvironment): Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the loading time are logged in debug mode. + :return: Corrected list containing only valid station IDs. + """ + t_outer = TimeTracking() + logging.info(f"check valid stations started{' (%s)' % (set_name if set_name is not None else 'all')}") + # calculate transformation using train data + if set_name == "train": + logging.info("setup transformation using train data exclusively") + self.transformation(data_handler, set_stations) + # start station check + collection = DataCollection() + valid_stations = [] + kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name) + + if multiprocessing.cpu_count() > 1: # parallel solution + logging.info("use parallel validate station approach") + pool = multiprocessing.Pool() + output = [ + pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) + for station in set_stations] + for p in output: + dh, s = p.get() + if dh is not None: + collection.add(dh) + valid_stations.append(s) + else: # serial solution + logging.info("use serial validate station approach") + for station in set_stations: + dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs) + if dh is not None: + collection.add(dh) + valid_stations.append(s) + + logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/" + f"{len(set_stations)} valid stations.") + return collection, valid_stations + + def validate_station_old(self, data_handler: AbstractDataHandler, set_stations, set_name=None, + store_processed_data=True): + """ + Check if all given stations in `all_stations` are valid. + + Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the + loading time are logged in debug mode. + :return: Corrected list containing only valid station IDs. """ t_outer = TimeTracking() @@ -231,3 +277,18 @@ class PreProcessing(RunEnvironment): transformation_dict = data_handler.transformation(stations, **kwargs) if transformation_dict is not None: self.data_store.set("transformation", transformation_dict) + + +def f_proc(data_handler, station, name_affix, store, **kwargs): + """ + Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and + therefore f_proc will return None as indication. On a successfull build, f_proc returns the built data handler and + the station that was used. This function must be implemented globally to work together with multiprocessing. + """ + try: + res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, + **kwargs) + except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError) as e: + logging.info(f"remove station {station} because it raised an error: {e}") + res = None + return res, station diff --git a/run_hourly.py b/run_hourly.py index b831cf1e1ee733a3c652c6cea364013b44cf2c0d..a21c779bc007c7fbe67c98584687be3954e1d62c 100644 --- a/run_hourly.py +++ b/run_hourly.py @@ -6,6 +6,17 @@ import argparse from mlair.workflows import DefaultWorkflow +def load_stations(): + import json + try: + filename = 'supplement/station_list_north_german_plain.json' + with open(filename, 'r') as jfile: + stations = json.load(jfile) + except FileNotFoundError: + stations = None + return stations + + def main(parser_args): workflow = DefaultWorkflow(sampling="hourly", window_history_size=48, **parser_args.__dict__) diff --git a/supplement/station_list_north_german_plain.json b/supplement/station_list_north_german_plain.json new file mode 100644 index 0000000000000000000000000000000000000000..5e92dee5facdd26f0ac044a3c8cbfeac4256bf56 --- /dev/null +++ b/supplement/station_list_north_german_plain.json @@ -0,0 +1,81 @@ +[ +"DENI031", +"DESH016", +"DEBB050", +"DEHH022", +"DEHH049", +"DEHH021", +"DEMV007", +"DESH015", +"DEBE062", +"DEHH012", +"DESH004", +"DENI062", +"DEBE051", +"DEHH011", +"DEHH023", +"DEUB020", +"DESH005", +"DEBB039", +"DEHH050", +"DENI029", +"DESH001", +"DEBE001", +"DEHH030", +"DEHH018", +"DEUB022", +"DEBB038", +"DEBB053", +"DEMV017", +"DENI063", +"DENI058", +"DESH014", +"DEUB007", +"DEUB005", +"DEBB051", +"DEUB034", +"DEST089", +"DEHH005", +"DESH003", +"DEUB028", +"DESH017", +"DEUB030", +"DEMV012", +"DENI052", +"DENI059", +"DENI060", +"DESH013", +"DEUB006", +"DEMV018", +"DEUB027", +"DEUB026", +"DEUB038", +"DEMV001", +"DEUB024", +"DEUB037", +"DESH008", +"DEMV004", +"DEUB040", +"DEMV024", +"DEMV026", +"DESH056", +"DEHH063", +"DEUB001", +"DEST069", +"DEBB040", +"DEBB028", +"DEBB048", +"DEBB063", +"DEBB067", +"DESH006", +"DEBE008", +"DESH012", +"DEHH004", +"DEBE009", +"DEHH007", +"DEBE005", +"DEHH057", +"DEHH047", +"DEBE006", +"DEBB110" +] diff --git a/test/test_run_modules/test_pre_processing.py b/test/test_run_modules/test_pre_processing.py index bdb8fdabff67ad894275c805522b9df4cf167011..602c563fa9e346bd944b20aee2c038970791730e 100644 --- a/test/test_run_modules/test_pre_processing.py +++ b/test/test_run_modules/test_pre_processing.py @@ -1,6 +1,7 @@ import logging import pytest +import mock from mlair.data_handler import DefaultDataHandler, DataCollection, AbstractDataHandler from mlair.helpers.datastore import NameNotFoundInScope @@ -34,7 +35,8 @@ class TestPreProcessing: yield pre RunEnvironment().__del__() - def test_init(self, caplog): + @mock.patch("multiprocessing.cpu_count", return_value=1) + def test_init(self, mock_cpu, caplog): ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) caplog.clear()