Skip to content
Snippets Groups Projects

Check consistency per channel

Merged Danilo Enoque Ferreira de Lima requested to merge meeting-jan-31 into main
1 file
+ 78
12
Compare changes
  • Side-by-side
  • Inline
+ 78
12
@@ -20,6 +20,8 @@ from sklearn.model_selection import train_test_split
from sklearn.base import clone, MetaEstimatorMixin
from joblib import Parallel, delayed
from functools import partial
import multiprocessing as mp
from typing import Any, Dict, List, Optional, Union, Tuple
@@ -343,7 +345,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
self.tof_start = tof_start
self.delta_tof = delta_tof
def transform(self, X: Dict[str, np.ndarray]) -> np.ndarray:
def transform(self, X: Dict[str, np.ndarray], keep_dictionary_structure: bool=False) -> np.ndarray:
"""
Get a dictionary with the channel names for the inut low resolution data and output
only the relevant input data in an array.
@@ -351,18 +353,20 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
Args:
X: Dictionary with keys named channel_{i}_{k},
where i is a number between 1 and 4 and k is a letter between A and D.
keep_dictionary_structure: Whether to concatenate all channels, or keep them as a dictionary.
Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features).
"""
if self.tof_start is None:
raise NotImplementedError("The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.")
items = [X[k] for k in self.channels]
y = X
if self.delta_tof is not None:
items = [item[:, self.tof_start:(self.tof_start + self.delta_tof)] for item in items]
else:
items = [item[:, self.tof_start:] for item in items]
cat = np.concatenate(items, axis=1)
return cat
first = max(0, self.tof_start - self.delta_tof)
last = min(X[self.channels[0]].shape[1], self.tof_start + self.delta_tof)
y = {channel: item[:, first:last] for channel, item in X.items()}
if not keep_dictionary_structure:
return np.concatenate(list(y.values()), axis=1)
return y
def estimate_prompt_peak(self, X: Dict[str, np.ndarray]) -> int:
"""
@@ -416,8 +420,8 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(8, 16))
ax = plt.gca()
ax.plot(np.arange(peak_idx-100, peak_idx+300),
sum_low_res[peak_idx-100:peak_idx+300],
ax.plot(np.arange(peak_idx-300, peak_idx+300),
sum_low_res[peak_idx-300:peak_idx+300],
c="b",
label="Data")
ax.set(title="",
@@ -541,6 +545,11 @@ class Model(TransformerMixin, BaseEstimator):
#self.fit_model = FitModel()
self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-4, verbose=True))
self.channel_pca_model = {channel: Pipeline([('pca', PCA(n_pca_lr, whiten=True)),
('unc', UncertaintyHolder()),
])
for channel in channels}
# size of the test subset
self.validation_size = validation_size
@@ -593,18 +602,23 @@ class Model(TransformerMixin, BaseEstimator):
Returns: Smoothened high resolution spectrum.
"""
print("Fitting PCA on low-resolution data.")
x_t = self.x_model.fit_transform(low_res_data)
print("Fitting PCA on high-resolution data.")
y_t = self.y_model.fit_transform(high_res_data, smoothen__energy=high_res_photon_energy)
#self.fit_model.set_params(fex__gamma=1.0/float(x_t.shape[0]))
print("Fitting model.")
self.fit_model.fit(x_t, y_t)
# calculate the effect of the PCA
print("Calculate PCA unc. on high-resolution data.")
high_res = self.y_model['smoothen'].transform(high_res_data)
high_pca = self.y_model['pca'].transform(high_res)
high_pca_rec = self.y_model['pca'].inverse_transform(high_pca)
high_pca_unc = np.sqrt(np.mean((high_res - high_pca_rec)**2, axis=0, keepdims=True))
self.y_model['unc'].set_uncertainty(high_pca_unc)
print("Calculate PCA unc. on low-resolution data.")
low_res = self.x_model['select'].transform(low_res_data)
pca_model = self.x_model['pca']
if 'fex' in self.x_model.named_steps:
@@ -614,8 +628,58 @@ class Model(TransformerMixin, BaseEstimator):
low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True)
self.x_model['unc'].set_uncertainty(low_pca_unc)
# for consistency check per channel
print("Calculate PCA per channel on low-resolution data.")
selection_model = self.x_model['select']
low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True)
for channel in self.get_channels():
print(f"Calculate PCA on {channel}")
low_pca = self.channel_pca_model[channel].named_steps["pca"].fit_transform(low_res[channel])
low_pca_rec = self.channel_pca_model[channel].named_steps["pca"].inverse_transform(low_pca)
low_pca_unc = np.mean(np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True)
self.channel_pca_model[channel]['unc'].set_uncertainty(low_pca_unc)
print("End of fit.")
return high_res
def get_channel_quality(self, channel: str, low_res: Dict[str, np.ndarray], channel_pca_model: Dict[str, Pipeline]) -> float:
"""
Get the compatibility for a single channel.
Args:
channel: The channel.
low_res: The data in a dictionary.
pca_model: The PCA model.
Returns: the compatibility factor.
"""
pca_model = channel_pca_model[channel].named_steps['pca']
low_pca = pca_model.transform(low_res[channel])
low_pca_rec = pca_model.inverse_transform(low_pca)
low_pca_unc = channel_pca_model[channel].named_steps['unc'].uncertainty()
low_pca_dev = np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True))
return low_pca_dev/low_pca_unc
def check_compatibility_per_channel(self, low_res_data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
comparing the effect of the trained PCA model on it, but do it per channel.
Args:
low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel).
Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model per channel.
"""
selection_model = self.x_model['select']
low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True)
quality = {channel: 0.0 for channel in low_res.keys()}
channels = list(low_res.keys())
#with mp.Pool(len(low_res.keys())) as p:
# values = p.map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels)
values = map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels)
quality = dict(zip(channels, values))
return quality
def check_compatibility(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
@@ -689,8 +753,9 @@ class Model(TransformerMixin, BaseEstimator):
"""
joblib.dump([self.x_model,
self.y_model,
self.fit_model],
filename, compress='zlib')
self.fit_model,
self.channel_pca_model
], filename, compress='zlib')
@staticmethod
def load(filename: str) -> Model:
@@ -702,10 +767,11 @@ class Model(TransformerMixin, BaseEstimator):
Returns: A new model object.
"""
x_model, y_model, fit_model = joblib.load(filename)
x_model, y_model, fit_model, channel_pca_model = joblib.load(filename)
obj = Model()
obj.x_model = x_model
obj.y_model = y_model
obj.fit_model = fit_model
obj.channel_pca_model = channel_pca_model
return obj
Loading