diff --git a/prepare.sh b/prepare.sh index ce049819fdd5a6778d1252832a58ecfd913fc24c..1d0acccf20f17456c751b3d4b2789e9afba69e9f 100644 --- a/prepare.sh +++ b/prepare.sh @@ -86,6 +86,7 @@ elif [[ $MCHN == "juwels" ]] source ${activate_virt_env} pip install --upgrade pip pip install -r $STPD"requirements_juwels.txt" + python -m pip install "dask[dataframe]" --upgrade export PYTHONPATH=${CWD}:$PYTHONPATH >> ${activate_virt_env} export PYTHONPATH=${CWD}/source/:$PYTHONPATH >> ${activate_virt_env} diff --git a/source/experiments/cs_time_resolved_ozone.py b/source/experiments/cs_time_resolved_ozone.py new file mode 100644 index 0000000000000000000000000000000000000000..d4fdba5787308e03593daabb6eaf98701a8b262d --- /dev/null +++ b/source/experiments/cs_time_resolved_ozone.py @@ -0,0 +1,98 @@ +""" +Try the correct and smooth algorithm for missing data imputation +""" + +# general +import pdb +import pickle as pkl + +# data science +import numpy as np + +# sklearn +from sklearn.metrics import mean_squared_error, r2_score + +# pytorch +import torch +from models.cs import CorrectAndSmooth + +# plotting +import matplotlib.pyplot as plt + +# own package +import settings +from preprocessing.time_resolved import TimeResolvedOzone + + +print('Read in Dataset') +tro = TimeResolvedOzone() +tro.get_dataset() +dataset = tro +data = dataset[0] + +print('Baseline model') +x_train = data.x[data.train_mask].numpy() +y_train = data.y[data.train_mask].numpy().reshape(-1) +x_val = data.x[data.val_mask].numpy() +y_val = data.y[data.val_mask].numpy().reshape(-1) +# model = pkl.load(open(tro.rf_path, 'rb')) +# pdb.set_trace() +# y_val_hat = model.predict(x_val) +y_val_hat = np.full_like(y_val, fill_value=np.mean(y_train)) +rmse = (mean_squared_error(y_val, y_val_hat))**.5 +r2 = r2_score(y_val, y_val_hat) +print('======================') +print('Baseline results:') +print(f'RMSE: {rmse:.2f}, R2: {r2:.2f}') +('======================') + +print('Correct and smooth') +cs = CorrectAndSmooth(num_correction_layers=10, correction_alpha=.75, + num_smoothing_layers=10, smoothing_alpha=0.4, + autoscale=True) # autoscale is misleading... +x = data.x.numpy() +# y_hat = model.predict(x) +y_hat = np.full_like(data.y.numpy(), fill_value=np.mean(data.y[data.train_mask].numpy())) +y_hat = torch.tensor(y_hat, dtype=torch.float32).view(-1, 1) + +y_soft = cs.correct(y_soft=y_hat, y_true=data.y[data.train_mask], + mask=data.train_mask, edge_index=data.edge_index, + edge_weight=data.edge_weight) +y_val_soft = y_soft[data.val_mask].numpy() +# pdb.set_trace() +rmse = (mean_squared_error(y_val, y_val_soft))**.5 +r2 = r2_score(y_val, y_val_soft) +print(f'After correct:') +print(f'RMSE: {rmse:.2f}, R2: {r2:.2f}') + +y_soft2 = cs.smooth(y_soft=y_soft, y_true=data.y[data.train_mask], + mask=data.train_mask, edge_index=data.edge_index, + edge_weight=data.edge_weight) +y_val_soft2 = y_soft2[data.val_mask].numpy() +rmse = (mean_squared_error(y_val, y_val_soft2))**.5 +r2 = r2_score(y_val, y_val_soft2) +print(f'After smooth:') +print(f'RMSE: {rmse:.2f}, R2: {r2:.2f}') + +exit() +print('Incoming node degree vs. error in test set:') +node_degrees = [] +absolute_errors = [] +for node_idx in range(data.num_nodes): + if data.train_mask[node_idx].item(): + continue + edge_weights = data.edge_weight[data.edge_index[1, :]==node_idx] + edge_weights = torch.sort(edge_weights.view(-1)).values[:-1] + node_degrees.append(torch.sum(edge_weights).item()) + y = data.y[node_idx].item() + y_hat = y_soft2[node_idx].item() + absolute_errors.append(np.abs(y-y_hat)) +plt.scatter(node_degrees, absolute_errors, color='navy', alpha=.035) +plt.title('Random Forest on time resolved ozone') +plt.xlabel('incoming node degree') +plt.ylabel('absolute error') +# plt.show() +plt.savefig(settings.output_dir+'cs_time_resolved_node_degree_vs_error.png') + + + diff --git a/source/preprocessing/time_resolved.py b/source/preprocessing/time_resolved.py index b7169947048d3bf7904a9e7b52b7047b8c8b989b..bd564f6a886774ba65653a93535aee4b7faed34f 100644 --- a/source/preprocessing/time_resolved.py +++ b/source/preprocessing/time_resolved.py @@ -8,29 +8,37 @@ import random import pdb import warnings import pickle as pkl +import datetime # data science import numpy as np import pandas as pd +import dask.dataframe as ddf # sklearn +from sklearn.preprocessing import StandardScaler +from sklearn.neighbors import NearestNeighbors from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error, r2_score +from sklearn.metrics.pairwise import cosine_similarity # pytorch -from torch_geometric.data import Dataset +import torch +from torch_geometric.data import Data, InMemoryDataset # own package import settings from retrieval.toar_db import StationData from retrieval.toar_db import HourlyData +from preprocessing.utils import geo_to_cartesian, get_one_hot, \ + neighbor_index_filter # oppress future warnings warnings.filterwarnings('ignore') -class TimeResolvedOzone(Dataset): +class TimeResolvedOzone(InMemoryDataset): """ A dataset for graph ML on time resolved ozone data. @@ -40,9 +48,13 @@ class TimeResolvedOzone(Dataset): Initialize the class """ # super class - Dataset.__init__(self) + InMemoryDataset.__init__(self) - # paths + # raw data paths + self.raw_data_dir = settings.resources_dir + 'time_resolved_raw/' + self.station_path = self.raw_data_dir + 'stations.csv' + + # preprocessed data paths self.out_dir = settings.resources_dir + 'time_resolved_preproc/' self.reg_path = self.out_dir + 'reg.csv' self.x_path = self.out_dir + 'x.csv' @@ -51,6 +63,22 @@ class TimeResolvedOzone(Dataset): self.mask_path = self.out_dir + 'mask.csv' self.imp_path = self.out_dir + 'imp.csv' self.rf_path = self.out_dir + 'rf.pkl' + self.pos_path = self.out_dir + 'pos.csv' + self.edge_path = self.out_dir + 'edge.csv' + self.edge_tmp_path = self.out_dir + 'edge_tmp.csv' + self.weight_path = self.out_dir + 'weight.csv' + + # torch dataset paths + self.dataset_dir = settings.resources_dir + 'time_resolved_dataset/' + self.x_pt_path = self.dataset_dir + 'x.pt' + self.edge_index_pt_path = self.dataset_dir + 'edge_index.pt' + self.edge_weight_pt_path = self.dataset_dir + 'edge_weights.pt' + self.y_pt_path = self.dataset_dir + 'y.pt' + self.missing_o3_mask_pt_path = self.dataset_dir + 'missing_o3_mask.pt' + self.train_mask_pt_path = self.dataset_dir + 'train_mask.pt' + self.val_mask_pt_path = self.dataset_dir + 'val_mask.pt' + self.test_mask_pt_path = self.dataset_dir + 'test_mask.pt' + self.pos_pt_path = self.dataset_dir + 'pos.pt' def get_reg(self): """ @@ -63,11 +91,20 @@ class TimeResolvedOzone(Dataset): hd = HourlyData('o3') hd.read_from_file() + # filter the datetime index and add time offset + time_offset = settings.time_offset + hd.df.index = pd.to_datetime(hd.df.index) + time_offset + start = pd.to_datetime(settings.filter_datetime_start) + end = pd.to_datetime(settings.filter_datetime_end) + filter_condition = hd.df.index.to_series().between(start, end) + hd.df = hd.df[filter_condition].copy() + # set up df n_samples_total = len(hd.df.index)*len(hd.df.columns) cols = ['station_id', 'datetime'] reg_df = pd.DataFrame(index=range(n_samples_total), columns=cols) + reg_df.index.name = 'node_index' # fill df idx = 0 @@ -75,7 +112,7 @@ class TimeResolvedOzone(Dataset): for time_idx in hd.df.index: reg_df.loc[idx] = [station_idx, time_idx] idx += 1 - if idx % 1000000 == 0: + if idx % 500000 == 0: print(f'{idx/len(reg_df)*100:.1f} %') # save @@ -84,19 +121,7 @@ class TimeResolvedOzone(Dataset): def get_x(self): """ - A file with all node features: - - hour_of_day (easy) - - day_of_year (easy) - - temp (rea) - - relhum (rea) - - cloudcover (rea) - - pblheight (rea) - - u (rea) - - v (rea) - - alt (meta) - - relative_alt (meta) - - population_density (meta) - - nightlight (meta) + A file with all node features """ print('preprocessing x...') @@ -105,23 +130,29 @@ class TimeResolvedOzone(Dataset): rea_names = ['temp', 'relhum', 'cloudcover', 'pblheight', 'u', 'v'] meta_names = ['alt', 'relative_alt', 'population_density', - 'nightlight'] + 'nightlight', 'type', 'type_of_area'] # set up df feature_names = easy_names + rea_names + meta_names reg_df = pd.read_csv(self.reg_path, index_col=0, converters={'datetime': pd.Timestamp}) - df = pd.DataFrame(columns=feature_names, index=reg_df.index) + x_df = pd.DataFrame(columns=feature_names, index=reg_df.index) # easy fields - df.hour_of_day = [d.hour for d in reg_df.datetime] - df.day_of_year = [d.dayofyear for d in reg_df.datetime] + x_df.hour_of_day = [d.hour for d in reg_df.datetime] + x_df.day_of_year = [d.dayofyear for d in reg_df.datetime] - # rea fields + # rea fields, filter year and add time offset for rea_name in rea_names: hd = HourlyData(rea_name) hd.read_from_file() - df[rea_name] = hd.df.values.T.reshape(-1) + time_offset = settings.time_offset + hd.df.index = pd.to_datetime(hd.df.index) + time_offset + start = pd.to_datetime(settings.filter_datetime_start) + end = pd.to_datetime(settings.filter_datetime_end) + filter_condition = hd.df.index.to_series().between(start, end) + hd.df = hd.df[filter_condition].copy() + x_df[rea_name] = hd.df.values.T.reshape(-1) # meta fields sd = StationData() @@ -129,10 +160,19 @@ class TimeResolvedOzone(Dataset): for idx, col in sd.df.iterrows(): id_filter = reg_df.station_id==idx for meta_name in meta_names: - df.loc[id_filter, meta_name] = col[meta_name] + x_df.loc[id_filter, meta_name] = col[meta_name] + + # remame fields to wish names + mapper = {'temp': 'temperature', + 'relhum': 'relative_humidity', + 'cloudcover': 'cloud_cover', + 'pblheight': 'pbl_height', + 'u': 'wind_u', + 'v': 'wind_v'} + x_df.rename(columns=mapper, inplace=True) # save - df.to_csv(self.x_path) + x_df.to_csv(self.x_path) print(f'written to {self.x_path}') def get_y(self): @@ -143,29 +183,43 @@ class TimeResolvedOzone(Dataset): hd = HourlyData('o3') hd.read_from_file() + # filter + time_offset = settings.time_offset + hd.df.index = pd.to_datetime(hd.df.index) + time_offset + start = pd.to_datetime(settings.filter_datetime_start) + end = pd.to_datetime(settings.filter_datetime_end) + filter_condition = hd.df.index.to_series().between(start, end) + hd.df = hd.df[filter_condition].copy() + # flatten, but move through time steps first. y_values = hd.df.values.T.reshape(-1) y_df = pd.DataFrame(index=range(len(y_values)), columns=['y'], data=y_values) + y_df.index.name = 'node_index' # save - y_df.to_csv(self.label_path) - print(f'written to {self.label_path}') + y_df.to_csv(self.y_path) + print(f'written to {self.y_path}') def get_gaps(self): """ Analyze the gaps in measurement data, their lenght. + + Twins are gaps of another station but with the same start + and end datetime. """ - print('analyzing gaps...') + print('analyzing existing gaps...') # read in data y_df = pd.read_csv(self.y_path, index_col=0) reg_df = pd.read_csv(self.reg_path, index_col=0) # set up df - gap_df = pd.DataFrame(columns=['station_id', 'type', - 'start_idx', 'len']) + columns=['station_id', 'type', 'start_datetime', + 'end_datetime', 'start_idx', 'len', 'twins'] + gap_df = pd.DataFrame(columns=columns) + gap_df.index.name = 'gap_index' # set all values to 1 and all nans to 0 y_df.loc[y_df.y==y_df.y, 'y'] = 1 @@ -178,7 +232,7 @@ class TimeResolvedOzone(Dataset): y_index_gap_start = y_df.index[0] gap_len = 0 for y_index, row in y_df.iterrows(): - if y_index % 1000000 == 0: + if y_index % 500000 == 0: print(f'{y_index/len(y_df)*100:.1f} %') y = row.y station_id = reg_df.loc[y_index, 'station_id'] @@ -189,8 +243,12 @@ class TimeResolvedOzone(Dataset): # write gap to gap df and initialize new one gap_type = 'missing_o3' if y_old== 0 else 'value' if gap_type == 'missing_o3': + start = reg_df.loc[y_index_gap_start, 'datetime'] + end = reg_df.loc[y_index_gap_start+gap_len, 'datetime'] + twins = 0 gap_df.loc[gap_idx, :] = [station_id_old, gap_type, - y_index_gap_start, gap_len] + start, end, + y_index_gap_start, gap_len, twins] # print([station_id_old, gap_type, # y_index_gap_start, gap_len]) gap_idx += 1 @@ -199,6 +257,22 @@ class TimeResolvedOzone(Dataset): y_old = y station_id_old = station_id + # find twins with same start_datetime and end datetime + print('analyzing twins of gaps...') + for idx, row in gap_df.iterrows(): + if idx % 10000 == 0: + print(f'{idx/len(gap_df)*100:.1f} %') + twins = [] + station_id = row.station_id + start = row.start_datetime + end = row.end_datetime + id_filter = gap_df.station_id != station_id + start_filter = gap_df.start_datetime == start + end_filter = gap_df.end_datetime == end + gap_df_filtered = gap_df[id_filter&start_filter&end_filter] + twins = gap_df_filtered.index.to_list() + gap_df.at[idx, 'twins'] = len(twins) + # save gap_df.to_csv(self.gap_path) print(f'written to {self.gap_path}') @@ -212,7 +286,7 @@ class TimeResolvedOzone(Dataset): n.b. all masks should have the same statistical properties as missing_o3_mask """ - print('Preparing masks...') + print('preparing masks...') # read in gaps catalouge and ozone df gap_df = pd.read_csv(self.gap_path, index_col=0) @@ -224,6 +298,7 @@ class TimeResolvedOzone(Dataset): 'test_mask'] mask_df = pd.DataFrame(columns=columns, index=y_df.index, data=False) + mask_df.index.name = 'node_index' # missing o3 is simply where we do not have any measurement mask_df.loc[y_df.y!=y_df.y, 'missing_o3_mask'] = True @@ -270,15 +345,19 @@ class TimeResolvedOzone(Dataset): mask_df_filtered = mask_df[start_idx:end_idx] missing_values = len(mask_df_filtered[ (mask_df_filtered==True).any(axis=1)]) - if missing_values *10 > len_: + if missing_values * 10 > len_: print('Too many missing values') continue # set those to true that are not true elsewhere index_list = mask_df_filtered[ (mask_df_filtered==False).all(axis=1)].index mask_df.at[index_list, mask] = True - gap_df.loc[len(gap_df), :] = [start_station, mask, - start_idx, len_] + start_datetime = reg_df.at[start_idx, 'datetime'] + end_datetime = reg_df.at[end_idx, 'datetime'] + twins = 0 + _ = [start_station, mask, start_datetime, + end_datetime, start_idx, len_, twins] + gap_df.loc[len(gap_df), :] = _ masks_found += 1 print(start_station, missing_values, len_) @@ -286,27 +365,37 @@ class TimeResolvedOzone(Dataset): index_list = mask_df[(mask_df==False).all(axis=1)].index mask_df.at[index_list, 'train_mask'] = True + # save mask_df.to_csv(self.mask_path) print(f'written to {self.mask_path}') gap_df.to_csv(self.gap_path) print(f'written to {self.gap_path}') - def get_rf(self): + def get_rf(self, features=None, return_r2=False, save=True): """ Fit a simple model to x and y, then save it + + A feature list and the option to return the r2 are given + for feature selection purposes. """ - print('Baselines...') + if not return_r2: + print('baselines...') # read in data x_df = pd.read_csv(self.x_path, index_col=0) y_df = pd.read_csv(self.y_path, index_col=0) mask_df = pd.read_csv(self.mask_path, index_col=0) + # choosing features, one-hot encoding + if features: + x_df = x_df[features] + x_df = get_one_hot(x_df) + # prepare data x_train = x_df[mask_df.train_mask].to_numpy() + x_val = x_df[mask_df.val_mask].to_numpy() y_train = y_df[mask_df.train_mask].to_numpy().reshape(-1) - x_test = x_df[mask_df.test_mask].to_numpy() - y_test = y_df[mask_df.test_mask].to_numpy().reshape(-1) + y_val = y_df[mask_df.val_mask].to_numpy().reshape(-1) # fit model rf = RandomForestRegressor(random_state=settings.random_seed, @@ -315,13 +404,46 @@ class TimeResolvedOzone(Dataset): rf.fit(x_train, y_train) # evaluate - y_test_hat = rf.predict(x_test) - rmse = (mean_squared_error(y_test, y_test_hat))**.5 - r2 = r2_score(y_test, y_test_hat) - print('======================') - print('Baseline results:') - print(f'RMSE: {rmse:.2f}, R2: {r2:.2f}') - print('======================') trainning error????? + y_val_hat = rf.predict(x_val) + y_train_hat = rf.predict(x_train) + rmse_train = (mean_squared_error(y_train, y_train_hat))**.5 + r2_train = r2_score(y_train, y_train_hat) + rmse_val = (mean_squared_error(y_val, y_val_hat))**.5 + r2_val = r2_score(y_val, y_val_hat) + + # if the r2 is not returned, print stats + if not return_r2: + print('======================') + if features: + print('Features') + print([f for f in features]) + print('Baseline results for train:') + print(f'RMSE: {rmse_train:.2f}, R2: {r2_train:.2f}') + print('Baseline results for val:') + print(f'RMSE: {rmse_val:.2f}, R2: {r2_val:.2f}') + print('======================') + + # save + if save: + pkl.dump(rf, open(self.rf_path, 'wb')) + print(f'written to {self.rf_path}') + + # return metric + if return_r2: + return r2_val + + def get_imp(self, features=None): + """ + Feature importances of RF to define the graph structure + """ + print('importances of baseline model...') + + # read in data and model + x_df = pd.read_csv(self.x_path, index_col=0) + if features: + x_df = x_df[features] + x_df = get_one_hot(x_df) + rf = pkl.load(open(self.rf_path, 'rb')) # feature importances imp_df = pd.DataFrame(columns=['feature', 'importance'], @@ -330,27 +452,347 @@ class TimeResolvedOzone(Dataset): imp_df.importance = rf.feature_importances_ # save - pkl.dump(rf, open(self.rf_path, 'wb')) - print(f'written to {self.rf_path}') imp_df.to_csv(self.imp_path) print(f'written to {self.imp_path}') + def get_pos(self): + """ + Position of every node: lon and lat + """ + print('preprocessing positions of nodes...') + + # read in data + reg_df = pd.read_csv(self.reg_path, index_col=0) + sd = StationData() + sd.read_from_file() + station_df = sd.df + + # initialize position data frame + pos_df = pd.DataFrame(index=reg_df.index, + columns=['lon', 'lat', 'datetime']) + pos_df.index.name = 'node_index' - def get_imp(self): + # write positions to df + for idx, col in station_df.iterrows(): + id_filter = reg_df.station_id==idx + for coordinate in ['lon', 'lat']: + pos_df.loc[id_filter, coordinate] = col[coordinate] + pos_df.datetime = reg_df.datetime + + # save + pos_df.to_csv(self.pos_path) + print(f'written to {self.pos_path}') + + def get_edges(self): """ - Feature importances of RF to define the graph structure + Collect all edges. This routine best runs on the mem192 + partition of JUWELS. + + This routine also produces the edge_tmp.csv file, + which makes calculating edge weights a lot faster. """ - pass + print('preprocessing edges...') -if __name__ == '__main__': + # settings for edges, radius in km and time window + radius = 50. + time_window = pd.Timedelta('0 days 06:00:00') + + # read in data + station_df = pd.read_csv(self.station_path, index_col=0) + reg_df = pd.read_csv(self.reg_path, index_col=0, + parse_dates=['datetime']) + # reg_df = reg_df[0:int(1/1000*len(reg_df))].copy() <-- for testing + + # initialize edge dataframe, n.b. it is reorganized later + edge_df = pd.DataFrame(index=reg_df.index, + columns=['source_indices']) + edge_df.index.name = 'target_index' + + # find neighbors of stations in given radius + neighbor_df = pd.DataFrame(index=station_df.index, + columns=['neighbors']) + ids = station_df.index + x_, y_, z_ = geo_to_cartesian(station_df.lon.values, + station_df.lat.values) + coords = np.array([x_, y_, z_]).T + nn = NearestNeighbors() + nn.fit(coords) + _, idx_lists = nn.radius_neighbors(coords, + radius=50.) + for trg_idx, src_idx_list in enumerate(idx_lists): + trg_id = ids[trg_idx] + src_ids = [ids[src_idx] for src_idx in sorted(src_idx_list)] + neighbor_df.at[trg_id, 'neighbors'] = src_ids + + # edges exist if measurements are at neighboring stations + # and in the given time window + reg_ddf = ddf.from_pandas(reg_df, npartitions=6) + datetime_start = datetime.datetime.now() + print(datetime_start) + args = (neighbor_df, reg_df, radius, time_window) + edge_dseries = reg_ddf.apply(neighbor_index_filter, + args=args, + axis=1, + meta=edge_df['source_indices']) + edge_series = edge_dseries.compute(scheduler='multiprocessing') + edge_df.source_indices = edge_series + print(datetime.datetime.now() - datetime_start) + + # save temporary edge_df + edge_df.to_csv(self.edge_tmp_path) + print(f'written to {self.edge_tmp_path}') + + # reorganize the edge_df, so one edge gets one row. + source_index_list, target_index_list = [], [] + for index, row in edge_df.iterrows(): + source_indices = row.source_indices + target_index = index + source_index_list += source_indices + target_index_list += [target_index]*len(source_indices) + edge_df = pd.DataFrame() + edge_df['source_node_index'] = source_index_list + edge_df['target_node_index'] = target_index_list + edge_df.index.name = 'edge_index' + + # save + edge_df.to_csv(self.edge_path) + print(f'written to {self.edge_path}') + + def get_edge_weights(self, features=None): + """ + Weight edges by cosine similarity + """ + print('preprocessing edge weights...') + + # read in data + edge_tmp_df = pd.read_csv(self.edge_tmp_path, index_col=0) + x_df = pd.read_csv(self.x_path, index_col=0) + imp_df = pd.read_csv(self.imp_path, index_col=0) + + # prepare x + if features: + x_df = x_df[features] + x_df = get_one_hot(x_df) + for feature in x_df.columns: + scaler = StandardScaler() + scale_data = x_df[feature].to_numpy().reshape(-1, 1) + imp = float(imp_df[imp_df.feature==feature].importance.values) + x_df[feature] = scaler.fit_transform(scale_data) * imp + + # iterate through all node indices + edge_weight_list = [] + for target_node_index in edge_tmp_df.index: + if target_node_index % 500000 == 0: + print(f'{target_node_index/len(edge_tmp_df)*100:.1f} %') + source_node_indices = eval(edge_tmp_df.at[target_node_index, 'source_indices']) + x_source_nodes = x_df.loc[source_node_indices] + x_target_node = x_df.loc[target_node_index] + weights_source_nodes = cosine_similarity(x_source_nodes, + np.array(x_target_node).reshape(1, -1) + ).clip(0).reshape(-1) + weights_source_nodes = weights_source_nodes/sum(weights_source_nodes) + edge_weight_list += weights_source_nodes.tolist() + + # edge weight dataframe + weight_df = pd.DataFrame(columns=['edge_weight']) + weight_df['edge_weight'] = edge_weight_list + weight_df.index.name = 'edge_index' + + # save + weight_df.to_csv(self.weight_path) + print(f'written to {self.weight_path}') + + def convert_to_tensor(self, features=None): + """ + Convert all the .csv files to tensors + """ + print('converting to pytorch tensors...') + + # x + x_df = pd.read_csv(self.x_path, index_col=0) + if features: + x_df = x_df[features] + x_df = get_one_hot(x_df) + x = torch.tensor(x_df.values.astype(np.float32)) + torch.save(x, self.x_pt_path) + print(f'written to {self.x_pt_path}') + del x_df + + # edges + edge_df = pd.read_csv(self.edge_path, index_col=0) + src_list = edge_df.source_node_index.to_list() + trg_list = edge_df.target_node_index.to_list() + edge_index = torch.tensor(np.array([src_list, trg_list])).detach() + torch.save(edge_index, self.edge_index_pt_path) + print(f'written to {self.edge_index_pt_path}') + del edge_df + + # edge weights + weight_df = pd.read_csv(self.weight_path, index_col=0) + edge_weights = torch.tensor(weight_df.values.astype(np.float32)).view(-1, 1) + torch.save(edge_weights, self.edge_weight_pt_path) + print(f'written to {self.edge_weight_pt_path}') + del weight_df + + # y + y_df = pd.read_csv(self.y_path, index_col=0) + y = torch.tensor(y_df.values.astype(np.float32)).view(-1, 1) + torch.save(y, self.y_pt_path) + print(f'written to {self.y_pt_path}') + del y_df + + # masks + mask_df = pd.read_csv(self.mask_path, index_col=0) + missing_o3_mask = torch.tensor(mask_df.missing_o3_mask).view(-1) + torch.save(missing_o3_mask, self.missing_o3_mask_pt_path) + print(f'written to {self.missing_o3_mask_pt_path}') + train_mask = torch.tensor(mask_df.train_mask).view(-1) + torch.save(train_mask, self.train_mask_pt_path) + print(f'written to {self.train_mask_pt_path}') + val_mask = torch.tensor(mask_df.val_mask).view(-1) + torch.save(val_mask, self.val_mask_pt_path) + print(f'written to {self.val_mask_pt_path}') + test_mask = torch.tensor(mask_df.test_mask).view(-1) + torch.save(test_mask, self.test_mask_pt_path) + print(f'written to {self.test_mask_pt_path}') + del mask_df + + # position + pos_df = pd.read_csv(self.pos_path, index_col=0) + pos = torch.tensor(pos_df[['lon', 'lat']].values.astype(np.float32)) + torch.save(pos, self.pos_pt_path) + print(f'written to {self.pos_pt_path}') + del pos_df + + def get_dataset(self, features=None): + """ + Finalize the Pytorch dataset, include everything that is + in the gAQBench dataset. + """ + # read in data + x = torch.load(self.x_pt_path) + edge_index = torch.load(self.edge_index_pt_path) + edge_weight = torch.load(self.edge_weight_pt_path) + y = torch.load(self.y_pt_path) + missing_o3_mask = torch.load(self.missing_o3_mask_pt_path) + train_mask = torch.load(self.train_mask_pt_path) + val_mask = torch.load(self.val_mask_pt_path) + test_mask = torch.load(self.test_mask_pt_path) + pos = torch.load(self.pos_pt_path) + + # create dataset + data = Data(x=x, edge_index=edge_index, edge_weight=edge_weight, + y=y, missing_o3_mask=missing_o3_mask, + train_mask=train_mask, val_mask=val_mask, + test_mask=test_mask, pos=pos) + self.data, self.slices = self.collate([data]) + + +def main(): """ - Create the dataset for testing purposes. + Full workflow to create the dataset """ + # features are important for the simple model and the edge weights + features = ['hour_of_day', 'day_of_year', 'relative_alt', + 'pbl_height', 'temperature', 'alt', + 'population_density', 'type', 'wind_v', 'wind_u', + 'cloud_cover'] tro = TimeResolvedOzone() # tro.get_reg() # tro.get_x() # tro.get_y() # tro.get_gaps() # tro.get_mask() - tro.get_rf() - tro.get_imp() + # tro.get_rf(features=features) + # tro.get_imp(features=features) + # tro.get_pos() + # tro.get_edges() + # tro.get_edge_weights(features=features) + tro.convert_to_tensor(features=features) + tro.get_dataset() + dataset = tro + + print(f'Dataset: {dataset}:') + print('======================') + print(f'Number of graphs: {len(dataset)}') + print(f'Number of node features: {dataset.num_features}') + print(f'Number of edge features: {dataset.num_edge_features}') + data = dataset[0] # Get the first graph object. + print(data) + print('======================') + # Gather some statistics about the graph. + print(f'Number of nodes: {data.num_nodes}') + print(f'Number of edges: {data.num_edges}') + print(f'Average node degree: {(2*data.num_edges) / data.num_nodes:.2f}') + print(f'Number of training nodes: {data.train_mask.sum()}') + print(f'Training node share: {int(data.train_mask.sum()) / data.num_nodes:.2f}') + print(f'Contains isolated nodes: {data.has_isolated_nodes()}') + print(f'Contains self-loops: {data.has_self_loops()}') + print(f'Is undirected: {data.is_undirected()}') + + +def feature_selection(): + """ + Forward feature selection for the simple model + """ + # in itialize class + tro = TimeResolvedOzone() + + # read in data + x_df = pd.read_csv(tro.x_path, index_col=0) + all_features = x_df.columns + + # create pairs + pair_list = [] + for f1 in all_features: + for f2 in all_features: + condition1 = f1 != f2 + condition2 = [f2, f1] not in pair_list + if (condition1 and condition2): + pair_list.append([f1, f2]) + + # train on all pairs and append to r2 list + print('\ntesting 2 features:') + r2_list = [] + for pair in pair_list: + r2 = tro.get_rf(features=pair, return_r2=True, save=False) + r2_list.append(r2) + print(f'{pair}, {r2:.3f}') + + # look for the winning feature + chosen_features = pair_list[r2_list.index(max(r2_list))] + best_r2 = max(r2_list) + print('\nwinners:') + print(f'{chosen_features}, {best_r2:.3f}') + + # append new features iteratively until r2 decreases + new_best_r2 = 1. + while new_best_r2 > best_r2: + print(f'\ntesting {len(chosen_features)+1} features:') + candidates_list = [] + new_r2_list = [] + for feature in all_features: + if feature in chosen_features: + continue + candidates = chosen_features + [feature] + r2 = tro.get_rf(features=candidates, return_r2=True, + save=False) + candidates_list.append(candidates) + new_r2_list.append(r2) + print(f'{candidates}, {r2:.3f}') + chosen_features = candidates_list[new_r2_list.index( + max(new_r2_list))] + new_best_r2 = max(new_r2_list) + print('winners:') + print(f'{chosen_features}, {new_best_r2:.3f}') + + # todo: new best r2 and best r2 are not comparable + + +if __name__ == '__main__': + """ + Start routines + """ + main() + # feature_selection() + diff --git a/source/preprocessing/utils.py b/source/preprocessing/utils.py index e892c106ebed3f9db264b3a6d4e42571767a3b37..ba809a1d26130ede19ca1e36a9d9e3a708c566f4 100644 --- a/source/preprocessing/utils.py +++ b/source/preprocessing/utils.py @@ -1,10 +1,19 @@ +# general +import pdb + +# data science import numpy as np +import pandas as pd import torch + def geo_to_cartesian(lons, lats): """ Maps longitudes and latitudes onto a 3d grid which is practical for distance measures + + inputs: lons, lats in degree, dtype is series. + returns: numpy arrays """ r = 6371. @@ -16,3 +25,35 @@ def geo_to_cartesian(lons, lats): z = r * np.sin(lats) return x, y, z + + +def neighbor_index_filter(row, neighbor_df, reg_df, radius, time_window): + """ + For a given row in reg_df, find all neighboring nodes + """ + + trg_id = row.station_id + trg_datetime = row.datetime + neigh_ids = neighbor_df.at[trg_id, 'neighbors'] + + rad_filter = reg_df.station_id.isin(neigh_ids) + time_filter = abs(reg_df.datetime-trg_datetime) <= time_window + + src_idxs = reg_df[rad_filter&time_filter].index.to_list() + + return src_idxs + + +def get_one_hot(x_df): + """ + If this data frame contains 'type' or 'type_of_area', + replace them with one-hot encoded columns + """ + + for col in ['type', 'type_of_area']: + if col in x_df.columns: + dummies = pd.get_dummies(x_df[col], prefix=col) + x_df.drop(col, axis=1, inplace=True) + x_df = pd.concat([x_df, dummies], axis=1) + + return x_df.copy() diff --git a/source/settings.py b/source/settings.py index 6aa003b2ac2eb5b3ec733ea1cadf4ba59b8b01ea..0d451f948169517d8f49860974d9c175c649d974 100644 --- a/source/settings.py +++ b/source/settings.py @@ -7,6 +7,7 @@ and data files that we use in our project. # basic packages import os import pathlib +import pandas as pd import pdb # find the source of our project, only to know the directory @@ -31,6 +32,10 @@ random_seed = 1 datetime_start = '2009-01-01 00:00:00' datetime_end = '2013-12-31 23:59:59' +filter_datetime_start = '2011-01-01 00:00:00' +filter_datetime_end = '2011-12-31 23:59:59' + +time_offset = pd.Timedelta('0 days 02:00:00') if __name__ == '__main__': print('SOURCEDIR:', SOURCEDIR_pos)