Skip to content
Snippets Groups Projects
Commit 001c98ab authored by Danilo Ferreira de Lima's avatar Danilo Ferreira de Lima
Browse files

Checking compatibility per channel.

parent a7323718
No related branches found
No related tags found
1 merge request!5Check consistency per channel
......@@ -343,7 +343,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
self.tof_start = tof_start
self.delta_tof = delta_tof
def transform(self, X: Dict[str, np.ndarray]) -> np.ndarray:
def transform(self, X: Dict[str, np.ndarray], keep_dictionary_structure: bool=False) -> np.ndarray:
"""
Get a dictionary with the channel names for the inut low resolution data and output
only the relevant input data in an array.
......@@ -351,18 +351,20 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
Args:
X: Dictionary with keys named channel_{i}_{k},
where i is a number between 1 and 4 and k is a letter between A and D.
keep_dictionary_structure: Whether to concatenate all channels, or keep them as a dictionary.
Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features).
"""
if self.tof_start is None:
raise NotImplementedError("The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.")
items = [X[k] for k in self.channels]
y = X
if self.delta_tof is not None:
items = [item[:, self.tof_start:(self.tof_start + self.delta_tof)] for item in items]
else:
items = [item[:, self.tof_start:] for item in items]
cat = np.concatenate(items, axis=1)
return cat
first = max(0, self.tof_start - self.delta_tof)
last = min(X[self.channels[0]].shape[1], self.tof_start + self.delta_tof)
y = {channel: item[:, first:last] for channel, item in X.items()}
if not keep_dictionary_structure:
return np.concatenate(list(y.values()), axis=1)
return y
def estimate_prompt_peak(self, X: Dict[str, np.ndarray]) -> int:
"""
......@@ -416,8 +418,8 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(8, 16))
ax = plt.gca()
ax.plot(np.arange(peak_idx-100, peak_idx+300),
sum_low_res[peak_idx-100:peak_idx+300],
ax.plot(np.arange(peak_idx-300, peak_idx+300),
sum_low_res[peak_idx-300:peak_idx+300],
c="b",
label="Data")
ax.set(title="",
......@@ -541,6 +543,11 @@ class Model(TransformerMixin, BaseEstimator):
#self.fit_model = FitModel()
self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-4, verbose=True))
self.channel_pca_model = {channel: Pipeline([('pca', PCA(n_pca_lr, whiten=True)),
('unc', UncertaintyHolder()),
])
for channel in channels}
# size of the test subset
self.validation_size = validation_size
......@@ -614,8 +621,40 @@ class Model(TransformerMixin, BaseEstimator):
low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True)
self.x_model['unc'].set_uncertainty(low_pca_unc)
# for consistency check per channel
selection_model = self.x_model['select']
low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True)
for channel in self.get_channels():
pca_model = self.channel_pca_model[channel].named_steps["pca"]
low_pca = pca_model.transform(low_res)
low_pca_rec = pca_model.inverse_transform(low_pca)
low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True)
self.channel_pca_model[channel]['unc'].set_uncertainty(low_pca_unc)
return high_res
def check_compatibility_per_channel(self, low_res_data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
comparing the effect of the trained PCA model on it, but do it per channel.
Args:
low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel).
Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model per channel.
"""
selection_model = self.x_model['select']
low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True)
pca_model = self.channel_pca_model.named_steps['pca']
quality = {channel: 0.0 for channel in low_res.keys()}
for channel in low_res.keys():
low_pca = pca_model.transform(low_res[channel])
low_pca_rec = pca_model.inverse_transform(low_pca)
low_pca_unc = self.channel_pca_model.named_steps['unc'].uncertainty()
low_pca_dev = np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True))
quality[channel] = low_pca_dev/low_pca_unc
return quality
def check_compatibility(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment