Commit 963d6fcc authored by mova's avatar mova
Browse files

add proprocessing with saving to disk

parent e6707923
......@@ -66,18 +66,18 @@ def main():
prediction_procedure()
if conf["command"] == "profile":
from .ml.profile import profile_procedure
if conf["command"] == "preprocess":
from .utils.preprocess import preprocess_procedure
profile_procedure()
preprocess_procedure()
if conf["command"] == "loadfile":
fn = str(conf.file_to_load)
file_name = str(conf.file_to_load)
import re
fn = re.sub(".*fgsim/(.*?).py", ".\\1", fn)
fn = re.sub("/", ".", fn)
importlib.import_module(fn, "fgsim")
file_name = re.sub(".*fgsim/(.*?).py", ".\\1", file_name)
file_name = re.sub("/", ".", file_name)
importlib.import_module(file_name, "fgsim")
if conf["command"] == "dump":
from .utils import dump_training
......
tag: default
loglevel: 1
path:
dataset: 'data/clic'
ds_lenghts: '${path.dataset}/filelengths.yaml'
validation: 'data/clic_valtest/validation.torch'
test: 'data/clic_valtest/test.torch'
run_path: 'wd/${tag}/${hash}'
comet_exp_key: "${path.run_path}/comet_experiment_key"
log: "${path.run_path}/log"
train_config: "${path.run_path}/resulting_train_config.yaml"
full_config: "${path.run_path}/full_config.yaml"
tensorboard: "${path.run_path}/tb"
checkpoint: "${path.run_path}/checkpoint.torch"
checkpoint_old: "${path.run_path}/checkpoint_old.torch"
predict_csv: "${path.run_path}/prediction.csv"
complete_state: "${path.run_path}/complete.yaml"
dataset: "data/hgcal_william"
dataset_glob: "**/*.root"
dataset_processed: "${path.dataset}/processed"
ds_lenghts: "${path.dataset_processed}/filelengths.yaml"
validation: "${path.dataset_processed}/validation.pt"
test: "${path.dataset_processed}/test.pt"
training: "${path.dataset_processed}/training"
training_glob: "*.pt"
geo_lup: "data/hgcal/DetIdLUT_full.root"
run_path: "wd/${tag}/${hash}"
train_config: "${path.run_path}/resulting_train_config.yaml"
full_config: "${path.run_path}/full_config.yaml"
tensorboard: "${path.run_path}/tb"
checkpoint: "${path.run_path}/checkpoint.torch"
checkpoint_old: "${path.run_path}/checkpoint_old.torch"
predict_csv: "${path.run_path}/prediction.csv"
complete_state: "${path.run_path}/complete.yaml"
comet_exp_key: "${path.run_path}/comet_experiment_key"
log: "${path.run_path}/log"
seed: 0
loader:
keylist: [
"energy",
"ECAL",
"ECAL_E",
"HCAL",
"HCAL_E",
"recoEta",
"recoPhi",
"recoTheta",
]
chunksize: 100
prefetch_batches: 2
batch_size : 100
num_workers_transform: 20
num_workers_stack: 10
validation_set_size: 5000
test_set_size: 30000
transform_workers: 15
nlayers: 25
loader: ${loader_options[${loader_name}]}
model:
name: "default"
dyn_features: 10
static_features: 3
deeplayer_nodes: 50
nprop: 20
name: "default"
dyn_features: 10
static_features: 3
deeplayer_nodes: 50
nprop: 20
training:
events_processed_before_validation: 100000
validation_interval: '${div:${training.events_processed_before_validation},${loader.batch_size}}'
early_stopping:
validation_steps: 40
improvement: 0.02
yvar : "energy"
events_processed_before_validation: 100000
validation_interval: "${div:${training.events_processed_before_validation},${loader.batch_size}}"
early_stopping:
validation_steps: 40
improvement: 0.02
yvar: "energy"
loss:
name: L1Loss
name: L1Loss
optimizer:
name: Adam
parameters: ${loss_params[${optimizer.name}]}
name: Adam
parameters: ${loss_options[${optimizer.name}]}
loss_params:
Adam:
weight_decay: 0.0001
lr : 0.0002
SGD:
lr : 0.0002
loss_options:
Adam:
weight_decay: 0.0001
lr: 0.0002
SGD:
lr: 0.0002
loader_options:
clic:
qf_seq_name: "clic_seq"
keylist:
[
"energy",
"ECAL",
"ECAL_E",
"HCAL",
"HCAL_E",
"recoEta",
"recoPhi",
"recoTheta",
]
chunksize: 100
batch_size: 100
validation_set_size: 5000
test_set_size: 30000
prefetch_batches: 2
num_workers_transform: 30
num_workers_stack: 2
hgcal:
qf_seq_name: "hgcal_seq"
rootprefix: "treeMaker/tree"
preprocess_training: True
keylist: ["genPh_E", "simHit_detid", "simHit_E"]
hlvs:
[
"sum_energy",
"num_isolated",
"isolated_energy",
"isolated_E_fraction",
"x_mean",
"x_std",
"x_mom3",
"y_mean",
"y_std",
"y_mom3",
"z_mean",
"z_std",
"z_mom3",
]
cell_prop_keys: ["x", "y", "z", "celltype", "issilicon"]
braches:
id: "simHit_detid"
energy: "genPh_E"
hit_energy: "simHit_E"
chunksize: 100
batch_size: 100
validation_set_size: 5000
test_set_size: 30000
prefetch_batches: 2
num_workers_transform: 30
num_workers_stack: 2
......@@ -2,9 +2,7 @@
Conversion from list of hit to graph
"""
# import sys
from os.path import isfile, splitext
from pathlib import Path
from typing import Dict
import awkward as ak
......@@ -20,8 +18,8 @@ from torch_geometric.data import Data as GraphType
from fgsim.config import conf
from fgsim.utils.logger import logger
pickle_lup_path = splitext("conf.path.geo_lup")[0] + ".pd"
if not isfile(pickle_lup_path):
pickle_lup_path = Path(conf.path.geo_lup).with_suffix(".pd")
if not pickle_lup_path.is_file():
with uproot.open(conf.path.geo_lup) as rf:
geo_lup = rf["analyzer/tree;1"].arrays(library="ak")
geo_lup = ak.to_pandas(geo_lup)
......
"""
Here steps for reading the root files and processing the hit list \
to graphs are definded. `process_seq` is the function that \
should be passed the qfseq.
"""
from pathlib import Path
from typing import List
import torch
from torch.multiprocessing import Queue
from torch_geometric.data import Data as GraphType
from fgsim.config import conf
from fgsim.io import qf
# Load files
dataset_path = Path(conf.path.training)
dataset_path.mkdir(parents=True, exist_ok=True)
# reading from the filesystem
def read_file(file: Path) -> GraphType:
batch_list: List[GraphType] = torch.load(file)
return batch_list
# Collect the steps
def preprocessed_seq():
return (
qf.ProcessStep(read_file, 2, name="read_chunk"),
qf.pack.UnpackStep(),
Queue(conf.loader.prefetch_batches),
)
......@@ -5,6 +5,7 @@ loaded depending on `conf.loader.name`.
import importlib
import os
from pathlib import Path
import numpy as np
import torch
......@@ -12,11 +13,12 @@ import torch
from fgsim.config import conf
from fgsim.geo.batchtype import DataSetType
from fgsim.io import qf
from fgsim.io.preprocessed_seq import preprocessed_seq
from fgsim.io.qf.sequence import Sequence as qfseq
from fgsim.utils.logger import logger
# Import the specified processing sequence
sel_seq = importlib.import_module(f"fgsim.io.{conf.loader.name}", "fgsim.models")
sel_seq = importlib.import_module(f"fgsim.io.{conf.loader.qf_seq_name}")
process_seq = sel_seq.process_seq
files = sel_seq.files
......@@ -77,12 +79,12 @@ must queue an epoch via `queue_epoch()` and iterate over the instance of the cla
n_validation_chunks = conf.loader.validation_set_size // chunksize
assert conf.loader.test_set_size % chunksize == 0
n_test_batches = conf.loader.test_set_size // batch_size
self.n_test_batches = conf.loader.test_set_size // batch_size
n_testing_chunks = conf.loader.test_set_size // chunksize
logger.info(
f"Using the first {n_validation_batches} batches for "
+ f"validation and the next {n_test_batches} batches for testing."
+ f"validation and the next {self.n_test_batches} batches for testing."
)
self.validation_chunks = chunk_coords[:n_validation_chunks]
......@@ -102,25 +104,28 @@ must queue an epoch via `queue_epoch()` and iterate over the instance of the cla
# Assign the sequence with the specifice steps needed to process the dataset.
self.qfseq = qf.Sequence(*process_seq())
if not os.path.isfile(conf.path.validation):
logger.warning(
f"""\
Processing validation batches, queuing {len(self.validation_chunks)} chunks."""
)
self.qfseq.queue_iterable(self.validation_chunks)
self._validation_batches = [batch for batch in self.qfseq]
torch.save(self._validation_batches, conf.path.validation)
logger.warning("Validation batches pickled.")
if conf.loader.preprocess_training:
self.preprocessed_files = [
str(e)
for e in sorted(
Path(conf.path.training).glob(conf.path.training_glob)
)
]
if conf.command != "preprocess":
if (
not os.path.isfile(conf.path.validation)
or not os.path.isfile(conf.path.test)
or (
conf.loader.preprocess_training
and (len(self.preprocessed_files) == 0)
)
):
raise FileNotFoundError
if not os.path.isfile(conf.path.test):
logger.warning(
f"""\
Processing testing batches, queuing {len(self.validation_chunks)} chunks."""
)
self.qfseq.queue_iterable(self.testing_chunks)
self._testing_batches = [batch for batch in self.qfseq]
torch.save(self._testing_batches, conf.path.test)
logger.warning("Testing batches pickled.")
# Override the qf seq if there is a preprocessed dataset available:
self.qfseq = qf.Sequence(*preprocessed_seq())
@property
def validation_batches(self) -> DataSetType:
......@@ -145,26 +150,72 @@ Processing testing batches, queuing {len(self.validation_chunks)} chunks."""
return self._testing_batches
def queue_epoch(self, n_skip_events=0) -> None:
n_skip_chunks = n_skip_events // conf.loader.chunksize
# Cycle Epochs
n_skip_chunks = n_skip_chunks % len(self.training_chunks)
n_skip_batches = (
n_skip_events % conf.loader.chunksize
) // conf.loader.batch_size
n_skip_epochs = n_skip_events // (
conf.loader.chunksize * len(self.training_chunks)
)
# Compute the batches on the fly
if not conf.loader.preprocess_training or conf.command == "preprocess":
# Repeat the shuffeling to get the same list
for _ in range(n_skip_epochs):
np.random.shuffle(self.training_chunks)
# Cycle Epochs
n_skip_chunks = (n_skip_events // conf.loader.chunksize) % len(
self.training_chunks
)
# Only queue to the chucks that are still left
epoch_chunks = self.training_chunks[n_skip_chunks:]
self.qfseq.queue_iterable(epoch_chunks)
# No calculate the number of batches that we still have to skip,
# because a chunk may be multiple batches and we need to skip
# the ones that are alread processed
n_skip_batches = (
n_skip_events % conf.loader.chunksize
) // conf.loader.batch_size
if n_skip_events != 0:
logger.info(
f"""\
Skipping {n_skip_events} events => {n_skip_chunks} chunks and {n_skip_batches} batches."""
)
epoch_chunks = self.training_chunks[n_skip_chunks:]
self.qfseq.queue_iterable(epoch_chunks)
if n_skip_batches != 0:
for _ in range(n_skip_batches):
_ = next(self.qfseq)
logger.info(f"Skipped {n_skip_batches} batches.")
np.random.shuffle(self.training_chunks)
# Load the preprocessed batches
else:
# Repeat the shuffeling to get the same list
for _ in range(n_skip_epochs):
np.random.shuffle(self.preprocessed_files)
# Calculate the number of files that have already been processed
# one file contains self.n_test_batches batches
n_skip_files = (
(n_skip_events // conf.loader.batch_size) # n batches
// self.n_test_batches # by the number of batches per file
% len(self.preprocessed_files) # modulo the files per epoch
)
epoch_files = self.preprocessed_files[n_skip_files:]
self.qfseq.queue_iterable(epoch_files)
if n_skip_batches != 0:
for _ in range(n_skip_batches):
_ = next(self.qfseq)
logger.info(f"Skipped {n_skip_batches} batches.")
# No calculate the number of batches that we still have to skip
n_skip_batches = (
(n_skip_events // conf.loader.batch_size) # n batches
) % self.n_test_batches # modulo the batches in a file
logger.info(
f"""\
Skipping {n_skip_events} events => {n_skip_files} files and {n_skip_batches} batches."""
)
if n_skip_batches != 0:
for _ in range(n_skip_batches):
_ = next(self.qfseq)
logger.info(f"Skipped {n_skip_batches} batches.")
np.random.shuffle(self.preprocessed_files)
def __iter__(self) -> qfseq:
return iter(self.qfseq)
......@@ -14,8 +14,8 @@ parser.add_argument(
subparsers = parser.add_subparsers(help="Available Commands", dest="command")
train_parser = subparsers.add_parser("train")
predict_parser = subparsers.add_parser("profile")
predict_parser = subparsers.add_parser("predict")
preprocess_parser = subparsers.add_parser("preprocess")
dump_parser = subparsers.add_parser("dump")
loadfile_parser = subparsers.add_parser("loadfile")
loadfile_parser.add_argument(
......
"""Provides the procedure to preprocess the datasets"""
from typing import List
import torch
from torch_geometric.data import Data as GraphType
from tqdm import tqdm
from fgsim.config import conf
from fgsim.io.queued_dataset import QueuedDataLoader
from fgsim.utils.logger import logger
def preprocess_procedure(
data_loader: QueuedDataLoader = QueuedDataLoader(),
) -> None:
logger.warning(
f"""\
Processing validation batches, queuing {len(data_loader.validation_chunks)} chunks."""
)
data_loader.qfseq.queue_iterable(data_loader.validation_chunks)
validation_batches = [batch for batch in tqdm(data_loader.qfseq)]
torch.save(validation_batches, conf.path.validation)
logger.warning("Validation batches pickled.")
logger.warning(
f"""\
Processing testing batches, queuing {len(data_loader.testing_chunks)} chunks."""
)
data_loader.qfseq.queue_iterable(data_loader.testing_chunks)
testing_batches = [batch for batch in tqdm(data_loader.qfseq)]
torch.save(testing_batches, conf.path.test)
logger.warning("Testing batches pickled.")
if conf.loader.preprocess_training:
logger.warning("Processing training batches")
data_loader.queue_epoch()
batch_list: List[GraphType] = []
ifile = 0
for batch in tqdm(data_loader.qfseq):
if len(batch_list) == data_loader.n_test_batches:
torch.save(testing_batches, f"{conf.path.training}/{ifile}.pt")
ifile += 1
batch_list = []
batch_list.append(batch)
torch.save(testing_batches, f"{conf.path.training}/{ifile}.pt")
......@@ -55,20 +55,19 @@
"args": [
"--tag",
"hgcal",
"--debug",
// "--debug",
"train",
]
},
{
"name": "t cnn",
"name": "preprocess hgcal",
"type": "python",
"request": "launch",
"module": "fgsim",
"args": [
"--tag",
"cnn",
// "--debug",
"train",
"hgcal",
"preprocess",
]
},
{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment