Commit 0094b7e7 authored by mova's avatar mova
Browse files

add grid process_seq with switch in the queued_dataset, add googlenet-like cnn architekture

parent 57946796
"""
Here steps for reading the h5 files and processing the calorimeter \
images to graphs are definded. `process_seq` is the function that \
should be passed the qfseq.
"""
import h5py as h5
import numpy as np
import torch_geometric
from torch.multiprocessing import Queue
from ..config import conf
from ..geo.batch_stack import split_layer_subgraphs
from ..geo.transform import transform
from . import qf
# reading from the filesystem
def read_chunk(chunks):
data_dict = {k: [] for k in conf.loader.keylist}
for chunk in chunks:
file_path, start, end = chunk
with h5.File(file_path) as h5_file:
for k in conf.loader.keylist:
data_dict[k].append(h5_file[k][start:end])
for k in conf.loader.keylist:
if len(data_dict[k][0].shape) == 1:
data_dict[k] = np.hstack(data_dict[k])
else:
data_dict[k] = np.vstack(data_dict[k])
# split up the events and pass them as a dict
output = [
{k: data_dict[k][ientry] for k in conf.loader.keylist}
for ientry in range(conf.loader.chunksize)
]
return output
def geo_batch(list_of_graphs):
batch = torch_geometric.data.Batch().from_data_list(list_of_graphs)
return batch
def magic_do_nothing(elem):
return elem
# Collect the steps
def process_seq():
return (
qf.ProcessStep(read_chunk, 2, name="read_chunk"),
Queue(1),
# In the input is now [(x,y), ... (x [300 * 51 * 51 * 25], y [300,1] ), (x,y)]
# For these elements to be processed by each of the workers in the following
# transformthey need to be (x [51 * 51 * 25], y [1] ):
qf.PoolStep(
transform, nworkers=conf.loader.num_workers_transform, name="transform"
),
Queue(1),
qf.RepackStep(conf.loader.batch_size),
qf.ProcessStep(geo_batch, 1, name="geo_batch"),
qf.ProcessStep(
split_layer_subgraphs,
conf.loader.num_workers_stack,
name="split_layer_subgraphs",
),
# Needed for outputs to stay in order.
qf.ProcessStep(
magic_do_nothing,
1,
name="magic_do_nothing",
),
Queue(conf.loader.prefetch_batches),
)
"""Here steps for reading the h5 files and processing the calorimeter images are definded.
`process_seq` is the function that should be passed the qfseq."""
import math
import h5py as h5
import numpy as np
import torch
from torch.multiprocessing import Queue
from ..config import conf
from . import qf
# reading from the filesystem
def read_chunk(chunks):
data_dict = {k: [] for k in conf.loader.keylist}
for chunk in chunks:
file_path, start, end = chunk
with h5.File(file_path) as h5_file:
for k in conf.loader.keylist:
data_dict[k].append(h5_file[k][start:end])
for k in conf.loader.keylist:
if len(data_dict[k][0].shape) == 1:
data_dict[k] = np.hstack(data_dict[k])
else:
data_dict[k] = np.vstack(data_dict[k])
# split up the events and pass them as a dict
output = [
{k: data_dict[k][ientry] for k in conf.loader.keylist}
for ientry in range(conf.loader.chunksize)
]
return output
def stack_batch(list_of_images):
batch = {
key: torch.stack(
[torch.tensor(img[key], dtype=torch.float32) for img in list_of_images]
)
for key in conf.loader.keylist
}
return batch
def preprocessing(batch):
windowSizeECAL = 25
windowSizeHCAL = 11
inputScaleSumE = 0.01
inputScaleEta = 10.0
# ECAL slice and energy sum
ECAL = batch["ECAL"]
lowerBound = math.ceil(ECAL.shape[1] / 2) - int(math.ceil(windowSizeECAL / 2))
upperBound = lowerBound + windowSizeECAL
ECAL = ECAL[:, lowerBound:upperBound, lowerBound:upperBound]
ECAL = ECAL.contiguous().view(-1, 1, windowSizeECAL, windowSizeECAL, 25)
ECAL_sum = ECAL.sum(2).sum(2).sum(2) * inputScaleSumE
batch["ECAL"] = ECAL
batch["ECAL_sum"] = ECAL_sum
# HCAL slice to get energy sum
HCAL = batch["HCAL"]
lowerBound = math.ceil(HCAL.shape[1] / 2) - int(math.ceil(windowSizeHCAL / 2))
upperBound = lowerBound + windowSizeHCAL
HCAL = HCAL[:, lowerBound:upperBound, lowerBound:upperBound]
HCAL = HCAL.contiguous().view(-1, 1, windowSizeHCAL, windowSizeHCAL, 60)
HCAL_sum = HCAL.sum(2).sum(2).sum(2) * inputScaleSumE
del batch["HCAL"]
batch["HCAL_sum"] = HCAL_sum
# reco angles
batch["recoEta"] = batch["recoEta"].view(-1, 1) * inputScaleEta
batch["recoPhi"] = batch["recoPhi"].view(-1, 1) * inputScaleEta
return batch
def magic_do_nothing(elem):
return elem
# Collect the steps
def process_seq():
return (
qf.ProcessStep(read_chunk, 4, name="read_chunk"),
qf.RepackStep(conf.loader.batch_size),
qf.ProcessStep(stack_batch, 1, name="stack_batch"),
qf.ProcessStep(preprocessing, 4, name="preprocessing"),
# Needed for outputs to stay in order.
qf.ProcessStep(
magic_do_nothing,
1,
name="magic_do_nothing",
),
Queue(conf.loader.prefetch_batches),
)
......@@ -154,6 +154,8 @@ class Sequence:
queue.get(block=False)
except Empty:
break
except FileNotFoundError:
break
for istep, step in enumerate(self.steps):
logger.debug(f"Stopping sequence step {istep}")
......
"""
Provides the `QueuedDataLoader` class. The definded sequence of qf steps is \
loaded depending on `conf.loader.name`.
"""
import importlib
import os
from pathlib import Path
import h5py as h5
import numpy as np
import torch
import torch_geometric
import yaml
from torch.multiprocessing import Queue
from ..config import conf, device
from ..geo.batch_stack import split_layer_subgraphs
from ..geo.transform import transform
from ..config import conf
from ..utils.logger import logger
from . import qf
# reading from the filesystem
def read_chunk(chunkL):
data_dict = {k: [] for k in conf.loader.keylist}
for chunk in chunkL:
file_path, start, end = chunk
with h5.File(file_path) as h5_file:
for k in conf.loader.keylist:
data_dict[k].append(h5_file[k][start:end])
for k in conf.loader.keylist:
if len(data_dict[k][0].shape) == 1:
data_dict[k] = np.hstack(data_dict[k])
else:
data_dict[k] = np.vstack(data_dict[k])
# split up the events and pass them as a dict
output = [
{k: data_dict[k][ientry] for k in conf.loader.keylist}
for ientry in range(conf.loader.chunksize)
]
return output
def geo_batch(list_of_graphs):
batch = torch_geometric.data.Batch().from_data_list(list_of_graphs)
return batch
def magic_do_nothing(x):
return x
# Collect the steps
def process_seq():
return (
qf.ProcessStep(read_chunk, 2, name="read_chunk"),
Queue(1),
# In the input is now [(x,y), ... (x [300 * 51 * 51 * 25], y [300,1] ), (x,y)]
# For these elements to be processed by each of the workers in the following
# transformthey need to be (x [51 * 51 * 25], y [1] ):
qf.PoolStep(
transform, nworkers=conf.loader.num_workers_transform, name="transform"
),
Queue(1),
qf.RepackStep(conf.loader.batch_size),
qf.ProcessStep(geo_batch, 1, name="geo_batch"),
qf.ProcessStep(
split_layer_subgraphs,
conf.loader.num_workers_stack,
name="split_layer_subgraphs",
),
# Needed for outputs to stay in order.
qf.ProcessStep(
magic_do_nothing,
1,
name="magic_do_nothing",
),
Queue(conf.loader.prefetch_batches),
)
# Import the specified processing sequence
process_seq = importlib.import_module(
f"..io.{conf.loader.name}", "fgsim.models"
).process_seq
ds_path = Path(conf.path.dataset)
......@@ -96,6 +43,12 @@ else:
class QueuedDataLoader:
"""
`QueuedDataLoader` makes `validation_batches` \
and `testing_batches` available as properties; to load training batches, one \
must queue an epoch via `queue_epoch()` and iterate over the instance of the class.
"""
def __init__(self):
chunksize = conf.loader.chunksize
batch_size = conf.loader.batch_size
......@@ -164,20 +117,20 @@ class QueuedDataLoader:
if not os.path.isfile(conf.path.validation):
logger.warn(
f"""\
Processing validation batches, queuing {len(self.validation_chunks)} batches."""
Processing validation batches, queuing {len(self.validation_chunks)} chunks."""
)
self.qfseq.queue_iterable(self.validation_chunks)
self._validation_batches = [batch.contiguous() for batch in self.qfseq]
self._validation_batches = [batch for batch in self.qfseq]
torch.save(self._validation_batches, conf.path.validation)
logger.warn("Validation batches pickled.")
if not os.path.isfile(conf.path.test):
logger.warn(
f"""\
Processing testing batches, queuing {len(self.validation_chunks)} batches."""
Processing testing batches, queuing {len(self.validation_chunks)} chunks."""
)
self.qfseq.queue_iterable(self.testing_chunks)
self._testing_batches = [batch.contiguous() for batch in self.qfseq]
self._testing_batches = [batch for batch in self.qfseq]
torch.save(self._testing_batches, conf.path.test)
logger.warn("Testing batches pickled.")
......@@ -201,9 +154,6 @@ Processing testing batches, queuing {len(self.validation_chunks)} batches."""
logger.warning("Finished loading.")
return self._testing_batches
def load_test_batches(self):
self.testing_batches = torch.load(conf.path.test, map_location=device)
def queue_epoch(self, n_skip_events=0):
n_skip_chunks = n_skip_events // conf.loader.chunksize
# Cycle Epochs
......@@ -218,8 +168,8 @@ Processing testing batches, queuing {len(self.validation_chunks)} batches."""
f"""\
Skipping {n_skip_events} events => {n_skip_chunks} chunks and {n_skip_batches} batches."""
)
self.epoch_chunks = self.training_chunks[n_skip_chunks:]
self.qfseq.queue_iterable(self.epoch_chunks)
epoch_chunks = self.training_chunks[n_skip_chunks:]
self.qfseq.queue_iterable(epoch_chunks)
if n_skip_batches != 0:
for _ in range(n_skip_batches):
......
"""
The CNN model from https://github.com/MattUnderscoreZhang/Triforce_CaloML/\
blob/master/Training/TriForce/Architectures/GoogLeNet.py
"""
import torch
import torch.nn.functional as F
from torch import nn
from ..config import conf, device
nfeatures = conf.model.dyn_features + conf.model.static_features
n_hl_features = len(conf.loader.keylist) - 2 - 1
epsilon = 1e-07
CLASSIFICATION, REGRESSION = 0, 1
class Inception(nn.Module):
def __init__(self, in_planes, n1x1, n3x3red, n3x3, n5x5red, n5x5, pool_planes):
super().__init__()
# 1x1 conv branch
self.b1 = nn.Sequential(
nn.Conv3d(in_planes, n1x1, kernel_size=1),
nn.BatchNorm3d(n1x1, eps=epsilon),
nn.ReLU(True),
)
# 1x1 conv -> 3x3 conv branch
self.b2 = nn.Sequential(
nn.Conv3d(in_planes, n3x3red, kernel_size=1),
nn.BatchNorm3d(n3x3red, eps=epsilon),
nn.ReLU(True),
nn.Conv3d(n3x3red, n3x3, kernel_size=3, padding=1),
nn.BatchNorm3d(n3x3, eps=epsilon),
nn.ReLU(True),
)
# 1x1 conv -> 5x5 conv branch
self.b3 = nn.Sequential(
nn.Conv3d(in_planes, n5x5red, kernel_size=1),
nn.BatchNorm3d(n5x5red, eps=epsilon),
nn.ReLU(True),
nn.Conv3d(n5x5red, n5x5, kernel_size=3, padding=1),
nn.BatchNorm3d(n5x5, eps=epsilon),
nn.ReLU(True),
nn.Conv3d(n5x5, n5x5, kernel_size=3, padding=1),
nn.BatchNorm3d(n5x5, eps=epsilon),
nn.ReLU(True),
)
# 3x3 pool -> 1x1 conv branch
self.b4 = nn.Sequential(
nn.MaxPool3d(3, stride=1, padding=1),
nn.Conv3d(in_planes, pool_planes, kernel_size=1),
nn.BatchNorm3d(pool_planes, eps=epsilon),
nn.ReLU(True),
)
def forward(self, x):
y1 = self.b1(x)
y2 = self.b2(x)
y3 = self.b3(x)
y4 = self.b4(x)
return torch.cat([y1, y2, y3, y4], 1)
class ModelClass(torch.nn.Module):
def __init__(self):
super().__init__()
self.pre_layers = nn.Sequential(
nn.Conv3d(1, 192, kernel_size=3, padding=1),
nn.BatchNorm3d(192, eps=epsilon),
nn.ReLU(True),
)
self.norm = nn.InstanceNorm3d(1)
# self.norm = nn.BatchNorm3d(1)
self.a3 = Inception(192, 64, 96, 128, 16, 32, 32)
self.b3 = Inception(256, 128, 128, 192, 32, 96, 64)
self.maxpool = nn.MaxPool3d(3, stride=2, padding=1)
self.a4 = Inception(480, 192, 96, 208, 16, 48, 64)
self.b4 = Inception(512, 160, 112, 224, 24, 64, 64)
self.c4 = Inception(512, 128, 128, 256, 24, 64, 64)
self.d4 = Inception(512, 112, 144, 288, 32, 64, 64)
self.e4 = Inception(528, 256, 160, 320, 32, 128, 128)
self.a5 = Inception(832, 256, 160, 320, 32, 128, 128)
self.b5 = Inception(832, 384, 192, 384, 48, 128, 128)
self.avgpool = nn.AvgPool3d(7, stride=1)
self.dense = nn.Linear(
1024 + 4, 1024
) # window size of 25, plus reco angles and energy sums
self.linear = nn.Linear(1024 + 5, 1) # output layer
def forward(self, batch):
ECAL = batch["ECAL"]
ECAL_sum = batch["ECAL_sum"]
HCAL_sum = batch["HCAL_sum"]
recoEta = batch["recoEta"]
recoPhi = batch["recoPhi"]
# net
x = self.norm(ECAL)
x = self.pre_layers(x)
x = self.a3(x)
x = self.b3(x)
x = self.maxpool(x)
x = self.a4(x)
x = self.b4(x)
x = self.c4(x)
x = self.d4(x)
x = self.e4(x)
x = self.maxpool(x)
x = self.a5(x)
x = self.b5(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
# cat angles / energy sums in before dense layer
x = torch.cat([x, recoPhi, recoEta, ECAL_sum, HCAL_sum], 1)
x = F.relu(self.dense(x))
# cat angles / energy sums back in before final layer
x = torch.cat(
[
x,
recoPhi,
recoEta,
ECAL_sum,
HCAL_sum,
torch.ones([batch["ECAL"].shape[0], 1], device=device),
],
1,
)
x = self.linear(x)
# preparing output
return x
from typing import Dict
import torch
import torch_geometric
from ..config import device
from ..utils.typecheck import istype
def move_batch_to_device(batch, device):
......@@ -19,18 +22,23 @@ def move_batch_to_device(batch, device):
else:
raise ValueError
batch_new = torch_geometric.data.Batch().from_dict(
{k: move(v) for k, v in batch.to_dict().items()}
)
for attr in [
"__slices__",
"__cat_dims__",
"__cumsum__",
"__num_nodes_list__",
"__num_graphs__",
]:
if hasattr(batch_new, attr):
setattr(batch_new, attr, move(getattr(batch, attr)))
if isinstance(batch, torch_geometric.data.Data):
batch_new = torch_geometric.data.Batch().from_dict(
{k: move(v) for k, v in batch.to_dict().items()}
)
for attr in [
"__slices__",
"__cat_dims__",
"__cumsum__",
"__num_nodes_list__",
"__num_graphs__",
]:
if hasattr(batch_new, attr):
setattr(batch_new, attr, move(getattr(batch, attr)))
elif istype(batch, Dict[str, torch.Tensor]):
batch_new = {k: move(v) for k, v in batch.items()}
else:
raise RuntimeError("Cannot move this object to the torch device.")
return batch_new
......
from typeguard import check_type
def istype(obj, objtype) -> bool:
try:
check_type("foo", obj, objtype)
return True
except TypeError:
return False
......@@ -48,14 +48,14 @@
"version": "0.2.0",
"configurations": [
{
"name": "t lin",
"name": "t cnn",
"type": "python",
"request": "launch",
"module": "fgsim",
"args": [
"--tag",
"linreg",
"--debug",
"cnn",
// "--debug",
"train",
]
},
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment