Select Git revision
mapping_data.py
mapping_data.py 7.56 KiB
import numpy as np
import pandas as pd
from sklearn import preprocessing
import math
# import logging
from settings import *
from dataset_datasplit import DataSplit
__author__ = 'Timo Stomberg, Clara Betancourt'
class Data:
"""
In this class the AQ-Bench data is preprocessed and stored to the respective properties.
"""
def __init__(self, target, scaling, scale_target):
self.target = target # column, which is chosen to be target (label, y-value)
self.scaling = scaling # scaling method (normalize or robust)
self.scale_target = scale_target
self.data = pd.read_csv(resources_dir + AQbench_dataset_file) # all AQ-Bench data
self.data = self.data.set_index('id')
self.info = pd.read_csv(resources_dir + AQbench_variables_file) # all infos of AQ-Bench data
self.scalers = {}
self.data_yx = pd.DataFrame() # contains all prepared data
self.info_yx = pd.DataFrame() # contains all prepared info
self.inputs = [] # List of all input names (column names)
self.yx_train = [] # inputs and labels (dictionary with train, val, test)
self.yx_val = []
self.yx_test = []
self.x_train = [] # input data (dictionary with train, val, test)
self.x_val = []
self.x_test = []
self.y_train = [] # labels (dictionary with train, val, test)
self.y_val = []
self.y_test = []
# Run functions.
self.choose_columns()
self.delete_rows()
self.transform()
self.list_inputs()
self.split_into_sets()
self.split_into_yx()
self.change_type()
def choose_columns(self):
"""
Looks to info and gets the needed columns from data.
"""
# Take only needed data.
# Save the info of the chosen target (label) to info_y.
info_y = self.info[self.info['column_name'] == self.target]
# Save the info of all input data to info_x.
info_x = self.info[self.info['input_target'] == 'input']
# Concat info_y and info_x to one list info_yx. This list contains the column names of all needed data.
self.info_yx = pd.concat([info_y, info_x])
# Get all data from columns within the list info_yx and put this data into self.data_yx.
self.data_yx = self.data[self.info_yx['column_name'].to_list()]
# Logging
# logging.info(f'{self.data_yx.shape[0]} rows were read in.')
def delete_rows(self):
"""
If in a row at least one data is missing (equals the fill value), delete the whole row.
"""
for idx, row in self.info_yx.iterrows():
column_name = row['column_name']
fill_value = row['fill_value']
try:
fill_value = float(fill_value)
except:
fill_value = str(fill_value)
self.data_yx = self.data_yx[self.data_yx[column_name] != fill_value]
# Logging
# logging.info(f'{self.data_yx.shape[0]} rows are remaining after deleting invalid ones.')
def transform(self):
"""
# For scaled data, normalizes the values for each column.
# For categorical data (one-hot), creates one-hot columns (one-hot vectors).
# Deletes all columns with circular data.
"""
for idx, row in self.info_yx.iterrows():
column_name = row['column_name']
if row['preparation'] == 'scale' and (column_name != self.target or self.scale_target):
if self.scaling == 'normalize':
self.scalers[column_name] = preprocessing.StandardScaler()
self.data_yx[column_name] = self.scalers[column_name].fit_transform(self.data_yx[[column_name]])
if self.scaling == 'robust':
self.scalers[column_name] = preprocessing.RobustScaler()
self.data_yx[column_name] = self.scalers[column_name].fit_transform(self.data_yx[[column_name]])
elif row['preparation'] == 'one-hot' and column_name != self.target:
one_hot = pd.get_dummies(self.data_yx[column_name], prefix=column_name)
# Add a column for each category (containing 0 or 1) and delete the original column.
self.data_yx = pd.concat([self.data_yx, one_hot], axis=1)
del self.data_yx[column_name]
elif row['preparation'] == 'circular' and column_name != self.target:
del self.data_yx[column_name]
elif column_name != self.target:
print(f"!!! Warning !!! Preparation could not be found: {row['column_name']}")
# logging.warning(f"!!! Warning !!! Preparation could not be found: {row['column_name']}")
def list_inputs(self):
"""
Saves all input variable names to self.inputs.
"""
self.inputs = list(self.data_yx)
self.inputs.remove(self.target)
def split_into_sets(self):
"""
Splits data into train, validation and test data.
"""
datasplit = DataSplit()
self.yx_train = self.data_yx[self.data_yx.index.isin(datasplit.ids_train)]
self.yx_train.sample(frac=1, random_state=0)
self.yx_val = self.data_yx[self.data_yx.index.isin(datasplit.ids_val)]
self.yx_val.sample(frac=1, random_state=0)
self.yx_test = self.data_yx[self.data_yx.index.isin(datasplit.ids_test)]
self.yx_test.sample(frac=1, random_state=0)
def split_into_yx(self):
"""
Splits data into x and y (inputs and target).
"""
self.x_train = self.yx_train
self.y_train = self.x_train.pop(self.target)
self.x_val = self.yx_val
self.y_val = self.x_val.pop(self.target)
self.x_test = self.yx_test
self.y_test = self.x_test.pop(self.target)
def change_type(self):
"""
Changes type of x and y of train, val and test.
Type is changed to numpy arrays with type float32.
"""
self.x_train = self.x_train.values.astype(np.float32)
self.y_train = self.y_train.values.astype(np.float32)
self.x_val = self.x_val.values.astype(np.float32)
self.y_val = self.y_val.values.astype(np.float32)
self.x_test = self.x_test.values.astype(np.float32)
self.y_test = self.y_test.values.astype(np.float32)
def print_params(self):
print(f'\n--- Data Preprocessing ---\n\n'
f'target: {str(self.target)}\n'
f'scaling: {str(self.scaling)}\n'
f'scale target: {self.scale_target}\n\n'
f'samples of training data: {str(len(self.x_train))}\n'
f'samples of validation data: {str(len(self.x_val))}\n'
f'samples of test data: {str(len(self.x_test))}\n')
self.data_yx.to_csv(output_dir + 'preprocessed_data.csv')
def inverse_transform(self, column_name, *data):
inverse_transs = []
for dat in data:
if column_name in self.scalers:
inverse_trans = self.scalers[column_name].inverse_transform([dat])[0]
else:
inverse_trans = dat
inverse_transs.append(inverse_trans)
return inverse_transs
"""
Set up logging
"""
# log_file = __file__.replace('py', 'log').split("/")[-1]
# logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# filename=log_dir+log_file
)
if __name__ == '__main__':
data = Data(target='o3_average_values', scaling='robust', scale_target=False)
data.print_params()