import re __author__ = 'Lukas Leufen, Felix Kleinert' __date__ = '2019-10-21' import logging import math import os import time import socket import datetime as dt import keras.backend as K import xarray as xr from typing import Dict, Callable def to_list(arg): if not isinstance(arg, list): arg = [arg] return arg def check_path_and_create(path): try: os.makedirs(path) logging.debug(f"Created path: {path}") except FileExistsError: logging.debug(f"Path already exists: {path}") def l_p_loss(power: int): """ Calculate the L<p> loss for given power p. L1 (p=1) is equal to mean absolute error (MAE), L2 (p=2) is to mean squared error (MSE), ... :param power: set the power of the error calculus :return: loss for given power """ def loss(y_true, y_pred): return K.mean(K.pow(K.abs(y_pred - y_true), power), axis=-1) return loss class TimeTracking(object): """ Track time to measure execution time. Time tracking automatically starts on initialisation and ends by calling stop method. Duration can always be shown by printing the time tracking object or calling get_current_duration. """ def __init__(self, start=True): self.start = None self.end = None if start: self._start() def _start(self): self.start = time.time() self.end = None def _end(self): self.end = time.time() def _duration(self): if self.end: return self.end - self.start else: return time.time() - self.start def __repr__(self): # return f"{round(self._duration(), 2)}s" return f"{dt.timedelta(seconds=math.ceil(self._duration()))} (hh:mm:ss)" def run(self): self._start() def stop(self, get_duration=False): if self.end is None: self._end() else: msg = f"Time was already stopped {time.time() - self.end}s ago." logging.error(msg) raise AssertionError(msg) if get_duration: return self.duration() def duration(self): return self._duration() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() logging.info(f"undefined job finished after {self}") def prepare_host(create_new=True): hostname = socket.gethostname() try: user = os.getlogin() except OSError: user = "default" if hostname == 'ZAM144': path = f'/home/{user}/Data/toar_daily/' elif hostname == 'zam347': path = f'/home/{user}/Data/toar_daily/' elif hostname == 'linux-aa9b': path = f'/home/{user}/machinelearningtools/data/toar_daily/' elif (len(hostname) > 2) and (hostname[:2] == 'jr'): path = f'/p/project/cjjsc42/{user}/DATA/toar_daily/' elif (len(hostname) > 2) and (hostname[:2] == 'jw'): path = f'/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_daily/' elif "runner-6HmDp9Qd-project-2411-concurrent" in hostname: path = f'/home/{user}/machinelearningtools/data/toar_daily/' else: logging.error(f"unknown host '{hostname}'") raise OSError(f"unknown host '{hostname}'") if not os.path.exists(path): try: if create_new: check_path_and_create(path) return path else: raise PermissionError except PermissionError: logging.error(f"path '{path}' does not exist for host '{hostname}'.") raise NotADirectoryError(f"path '{path}' does not exist for host '{hostname}'.") else: logging.debug(f"set path to: {path}") return path def set_experiment_name(experiment_date=None, experiment_path=None): if experiment_date is None: experiment_name = "TestExperiment" else: experiment_name = f"{experiment_date}_network" if experiment_path is None: experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", experiment_name)) else: experiment_path = os.path.join(os.path.abspath(experiment_path), experiment_name) return experiment_name, experiment_path def set_bootstrap_path(bootstrap_path, data_path, sampling): if bootstrap_path is None: bootstrap_path = os.path.join(data_path, "..", f"bootstrap_{sampling}") check_path_and_create(bootstrap_path) return bootstrap_path class PyTestRegex: """Assert that a given string meets some expectations.""" def __init__(self, pattern: str, flags: int = 0): self._regex = re.compile(pattern, flags) def __eq__(self, actual: str) -> bool: return bool(self._regex.match(actual)) def __repr__(self) -> str: return self._regex.pattern def dict_to_xarray(d: Dict, coordinate_name: str) -> xr.DataArray: """ Convert a dictionary of 2D-xarrays to single 3D-xarray. The name of new coordinate axis follows <coordinate_name>. :param d: dictionary with 2D-xarrays :param coordinate_name: name of the new created axis (2D -> 3D) :return: combined xarray """ xarray = None for k, v in d.items(): if xarray is None: xarray = v xarray.coords[coordinate_name] = k else: tmp_xarray = v tmp_xarray.coords[coordinate_name] = k xarray = xr.concat([xarray, tmp_xarray], coordinate_name) return xarray def float_round(number: float, decimals: int = 0, round_type: Callable = math.ceil) -> float: """ Perform given rounding operation on number with the precision of decimals. :param number: the number to round :param decimals: numbers of decimals of the rounding operations (default 0 -> round to next integer value) :param round_type: the actual rounding operation. Can be any callable function like math.ceil, math.floor or python built-in round operation. :return: rounded number with desired precision """ multiplier = 10. ** decimals return round_type(number * multiplier) / multiplier def list_pop(list_full: list, pop_items): pop_items = to_list(pop_items) if len(pop_items) > 1: return [e for e in list_full if e not in pop_items] else: list_pop = list_full.copy() list_pop.remove(pop_items[0]) return list_pop