Skip to content
Snippets Groups Projects
Select Git revision
  • 5843d2f64b3e166a07157fbb2d845d92cf7582c2
  • master default protected
  • enxhi_issue460_remove_TOAR-I_access
  • michael_issue459_preprocess_german_stations
  • sh_pollutants
  • develop protected
  • release_v2.4.0
  • michael_issue450_feat_load-ifs-data
  • lukas_issue457_feat_set-config-paths-as-parameter
  • lukas_issue454_feat_use-toar-statistics-api-v2
  • lukas_issue453_refac_advanced-retry-strategy
  • lukas_issue452_bug_update-proj-version
  • lukas_issue449_refac_load-era5-data-from-toar-db
  • lukas_issue451_feat_robust-apriori-estimate-for-short-timeseries
  • lukas_issue448_feat_load-model-from-path
  • lukas_issue447_feat_store-and-load-local-clim-apriori-data
  • lukas_issue445_feat_data-insight-plot-monthly-distribution
  • lukas_issue442_feat_bias-free-evaluation
  • lukas_issue444_feat_choose-interp-method-cams
  • 414-include-crps-analysis-and-other-ens-verif-methods-or-plots
  • lukas_issue384_feat_aqw-data-handler
  • v2.4.0 protected
  • v2.3.0 protected
  • v2.2.0 protected
  • v2.1.0 protected
  • Kleinert_etal_2022_initial_submission
  • v2.0.0 protected
  • v1.5.0 protected
  • v1.4.0 protected
  • v1.3.0 protected
  • v1.2.1 protected
  • v1.2.0 protected
  • v1.1.0 protected
  • IntelliO3-ts-v1.0_R1-submit
  • v1.0.0 protected
  • v0.12.2 protected
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.0 protected
  • v0.10.0 protected
  • IntelliO3-ts-v1.0_initial-submit
41 results

join.py

Blame
  • join.py 5.93 KiB
    __author__ = 'Felix Kleinert, Lukas Leufen'
    __date__ = '2019-10-16'
    
    
    import requests
    import logging
    import pandas as pd
    import datetime as dt
    from typing import Iterator, Union, List, Dict
    from src import helpers
    
    join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
    str_or_none = Union[str, None]
    
    
    class EmptyQueryResult(Exception):
        """
        Exception that get raised if a query to JOIN returns empty results.
        """
        pass
    
    
    def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
                      network_name: str = None) -> [pd.DataFrame, pd.DataFrame]:
    
        """
        read data from JOIN/TOAR
        :param station_name: Station name e.g. DEBY122
        :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean'
        :param station_type: set the station type like "traffic" or "background", can be none
        :param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
        :returns:
            - df - data frame with all variables and statistics
            - meta - data frame with all meta information
        """
        # make sure station_name parameter is a list
        station_name = helpers.to_list(station_name)
    
        # load series information
        vars_dict = load_series_information(station_name, station_type, network_name)
    
        # download all variables with given statistic
        data = None
        df = None
        for var in _lower_list(sorted(vars_dict.keys())):
            if var in stat_var.keys():
    
                logging.info('load: {}'.format(var))
    
                # create data link
                opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var],
                        'sampling': 'daily', 'capture': 0, 'min_data_length': 1460}
    
                # load data
                data = get_data(opts)
    
                # correct namespace of statistics
                stat = _correct_stat_name(stat_var[var])
    
                # store data in pandas dataframe
                df = _save_to_pandas(df, data, stat, var)
    
                logging.debug('finished: {}'.format(var))
    
        if data:
            meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
            meta.columns = station_name
            return df, meta
        else:
            raise EmptyQueryResult("No data found in JOIN.")
    
    
    def get_data(opts: Dict) -> Union[Dict, List]:
        """
        Download join data using requests framework. Data is returned as json like structure. Depending on the response
        structure, this can lead to a list or dictionary.
        :param opts: options to create the request url
        :return: requested data (either as list or dictionary)
        """
        url = create_url(**opts)
        response = requests.get(url)
        return response.json()
    
    
    def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none) -> Dict:
        """
        List all series ids that are available for given station id and network name.
        :param station_name: Station name e.g. DEBW107
        :param station_type: station type like "traffic" or "background"
        :param network_name: measurement network of the station like "UBA" or "AIRBASE"
        :return: all available series for requested station stored in an dictionary with parameter name (variable) as key
            and the series id as value.
        """
        opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
                "network_name": network_name}
        station_vars = get_data(opts)
        vars_dict = {item[3].lower(): item[0] for item in station_vars}
        return vars_dict
    
    
    def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame:
        """
        Save given data in data frame. If given data frame is not empty, the data is appened as new column.
        :param df: data frame to append the new data, can be none
        :param data: new data to append or format as data frame containing the keys 'datetime' and '<stat>'
        :param stat: extracted statistic to get values from data (e.g. 'mean', 'dma8eu')
        :param var: variable the data is from (e.g. 'o3')
        :return: new created or concatenated data frame
        """
        index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
        if df is None:
            df = pd.DataFrame(data[stat], index=index, columns=[var])
        else:
            df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
        return df
    
    
    def _correct_stat_name(stat: str) -> str:
        """
        Map given statistic name to new namespace defined by mapping dict. Return given name stat if not element of mapping
        namespace.
        :param stat: namespace from JOIN server
        :return: stat mapped to local namespace
        """
        mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
        return mapping.get(stat, stat)
    
    
    def _lower_list(args: List[str]) -> Iterator[str]:
        """
        lower all elements of given list
        :param args: list with string entries to lower
        :return: iterator that lowers all list entries
        """
        for string in args:
            yield string.lower()
    
    
    def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) -> str:
        """
        create a request url with given base url, service type and arbitrarily many additional keyword arguments
        :param base: basic url of the rest service
        :param service: service type, e.g. series, stats
        :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
        :return: combined url as string
        """
        if not base.endswith("/"):
            base += "/"
        url = f"{base}{service}/?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
        return url
    
    
    if __name__ == "__main__":
        var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
                       'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
                       'pblheight': 'maximum'}
        station = 'DEBW107'
        download_join(station, var_all_dic)