Skip to content
Snippets Groups Projects
Select Git revision
  • 67ba865a66d6261c89f762f5ff05897460b44801
  • master default protected
  • master-intern
3 results

mapping_data.py

Blame
  • user avatar
    Clara Betancourt authored
    67ba865a
    History
    mapping_data.py 7.56 KiB
    import numpy as np
    import pandas as pd
    from sklearn import preprocessing
    import math
    
    # import logging
    
    from settings import *
    from dataset_datasplit import DataSplit
    
    __author__ = 'Timo Stomberg, Clara Betancourt'
    
    
    class Data:
        """
        In this class the AQ-Bench data is preprocessed and stored to the respective properties.
        """
    
        def __init__(self, target, scaling, scale_target):
    
            self.target = target  # column, which is chosen to be target (label, y-value)
            self.scaling = scaling  # scaling method (normalize or robust)
            self.scale_target = scale_target
    
            self.data = pd.read_csv(resources_dir + AQbench_dataset_file)  # all AQ-Bench data
            self.data = self.data.set_index('id')
            self.info = pd.read_csv(resources_dir + AQbench_variables_file)  # all infos of AQ-Bench data
    
            self.scalers = {}
    
            self.data_yx = pd.DataFrame()  # contains all prepared data
            self.info_yx = pd.DataFrame()  # contains all prepared info
            self.inputs = []  # List of all input names (column names)
    
            self.yx_train = []  # inputs and labels (dictionary with train, val, test)
            self.yx_val = []
            self.yx_test = []
    
            self.x_train = []  # input data (dictionary with train, val, test)
            self.x_val = []
            self.x_test = []
    
            self.y_train = []  # labels (dictionary with train, val, test)
            self.y_val = []
            self.y_test = []
    
            # Run functions.
    
            self.choose_columns()
            self.delete_rows()
            self.transform()
            self.list_inputs()
            self.split_into_sets()
            self.split_into_yx()
            self.change_type()
    
        def choose_columns(self):
            """
            Looks to info and gets the needed columns from data.
            """
    
            # Take only needed data.
    
            # Save the info of the chosen target (label) to info_y.
            info_y = self.info[self.info['column_name'] == self.target]
            # Save the info of all input data to info_x.
            info_x = self.info[self.info['input_target'] == 'input']
            # Concat info_y and info_x to one list info_yx. This list contains the column names of all needed data.
            self.info_yx = pd.concat([info_y, info_x])
    
            # Get all data from columns within the list info_yx and put this data into self.data_yx.
            self.data_yx = self.data[self.info_yx['column_name'].to_list()]
    
            # Logging
    
            # logging.info(f'{self.data_yx.shape[0]} rows were read in.')
    
        def delete_rows(self):
            """
            If in a row at least one data is missing (equals the fill value), delete the whole row.
            """
    
            for idx, row in self.info_yx.iterrows():
                column_name = row['column_name']
                fill_value = row['fill_value']
    
                try:
                    fill_value = float(fill_value)
                except:
                    fill_value = str(fill_value)
    
                self.data_yx = self.data_yx[self.data_yx[column_name] != fill_value]
    
            # Logging
    
            # logging.info(f'{self.data_yx.shape[0]} rows are remaining after deleting invalid ones.')
    
        def transform(self):
            """
            # For scaled data, normalizes the values for each column.
            # For categorical data (one-hot), creates one-hot columns (one-hot vectors).
            # Deletes all columns with circular data.
            """
    
            for idx, row in self.info_yx.iterrows():
                column_name = row['column_name']
    
                if row['preparation'] == 'scale' and (column_name != self.target or self.scale_target):
                    if self.scaling == 'normalize':
                        self.scalers[column_name] = preprocessing.StandardScaler()
                        self.data_yx[column_name] = self.scalers[column_name].fit_transform(self.data_yx[[column_name]])
    
                    if self.scaling == 'robust':
                        self.scalers[column_name] = preprocessing.RobustScaler()
                        self.data_yx[column_name] = self.scalers[column_name].fit_transform(self.data_yx[[column_name]])
    
                elif row['preparation'] == 'one-hot' and column_name != self.target:
                    one_hot = pd.get_dummies(self.data_yx[column_name], prefix=column_name)
    
                    # Add a column for each category (containing 0 or 1) and delete the original column.
                    self.data_yx = pd.concat([self.data_yx, one_hot], axis=1)
                    del self.data_yx[column_name]
    
                elif row['preparation'] == 'circular' and column_name != self.target:
                    del self.data_yx[column_name]
    
                elif column_name != self.target:
                    print(f"!!! Warning !!! Preparation could not be found: {row['column_name']}")
                    # logging.warning(f"!!! Warning !!! Preparation could not be found: {row['column_name']}")
    
        def list_inputs(self):
            """
            Saves all input variable names to self.inputs.
            """
            self.inputs = list(self.data_yx)
            self.inputs.remove(self.target)
    
        def split_into_sets(self):
            """
            Splits data into train, validation and test data.
            """
    
            datasplit = DataSplit()
    
            self.yx_train = self.data_yx[self.data_yx.index.isin(datasplit.ids_train)]
            self.yx_train.sample(frac=1, random_state=0)
    
            self.yx_val = self.data_yx[self.data_yx.index.isin(datasplit.ids_val)]
            self.yx_val.sample(frac=1, random_state=0)
    
            self.yx_test = self.data_yx[self.data_yx.index.isin(datasplit.ids_test)]
            self.yx_test.sample(frac=1, random_state=0)
    
        def split_into_yx(self):
            """
            Splits data into x and y (inputs and target).
            """
    
            self.x_train = self.yx_train
            self.y_train = self.x_train.pop(self.target)
    
            self.x_val = self.yx_val
            self.y_val = self.x_val.pop(self.target)
    
            self.x_test = self.yx_test
            self.y_test = self.x_test.pop(self.target)
    
        def change_type(self):
            """
            Changes type of x and y of train, val and test.
            Type is changed to numpy arrays with type float32.
            """
    
            self.x_train = self.x_train.values.astype(np.float32)
            self.y_train = self.y_train.values.astype(np.float32)
    
            self.x_val = self.x_val.values.astype(np.float32)
            self.y_val = self.y_val.values.astype(np.float32)
    
            self.x_test = self.x_test.values.astype(np.float32)
            self.y_test = self.y_test.values.astype(np.float32)
    
        def print_params(self):
            print(f'\n--- Data Preprocessing ---\n\n'
                  f'target:                     {str(self.target)}\n'
                  f'scaling:                    {str(self.scaling)}\n'
                  f'scale target:               {self.scale_target}\n\n'
                  f'samples of training data:   {str(len(self.x_train))}\n'
                  f'samples of validation data: {str(len(self.x_val))}\n'
                  f'samples of test data:       {str(len(self.x_test))}\n')
    
            self.data_yx.to_csv(output_dir + 'preprocessed_data.csv')
    
        def inverse_transform(self, column_name, *data):
    
            inverse_transs = []
    
            for dat in data:
                if column_name in self.scalers:
                    inverse_trans = self.scalers[column_name].inverse_transform([dat])[0]
    
                else:
                    inverse_trans = dat
    
                inverse_transs.append(inverse_trans)
    
            return inverse_transs
    
    
    """
    Set up logging
    """
    # log_file = __file__.replace('py', 'log').split("/")[-1]
    # logging.basicConfig(
    #     level=logging.DEBUG,
    #     format="%(asctime)s [%(levelname)s] %(message)s",
    #     filename=log_dir+log_file
    )
    
    if __name__ == '__main__':
        data = Data(target='o3_average_values', scaling='robust', scale_target=False)
        data.print_params()