diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 01890d1829b4abb67f0481f5a42fc2c9afe8d61f..564beaba948b346a78248a1c627905886a1cf048 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -343,7 +343,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): self.tof_start = tof_start self.delta_tof = delta_tof - def transform(self, X: Dict[str, np.ndarray]) -> np.ndarray: + def transform(self, X: Dict[str, np.ndarray], keep_dictionary_structure: bool=False) -> np.ndarray: """ Get a dictionary with the channel names for the inut low resolution data and output only the relevant input data in an array. @@ -351,18 +351,20 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): Args: X: Dictionary with keys named channel_{i}_{k}, where i is a number between 1 and 4 and k is a letter between A and D. + keep_dictionary_structure: Whether to concatenate all channels, or keep them as a dictionary. Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features). """ if self.tof_start is None: raise NotImplementedError("The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.") - items = [X[k] for k in self.channels] + y = X if self.delta_tof is not None: - items = [item[:, self.tof_start:(self.tof_start + self.delta_tof)] for item in items] - else: - items = [item[:, self.tof_start:] for item in items] - cat = np.concatenate(items, axis=1) - return cat + first = max(0, self.tof_start - self.delta_tof) + last = min(X[self.channels[0]].shape[1], self.tof_start + self.delta_tof) + y = {channel: item[:, first:last] for channel, item in X.items()} + if not keep_dictionary_structure: + return np.concatenate(list(y.values()), axis=1) + return y def estimate_prompt_peak(self, X: Dict[str, np.ndarray]) -> int: """ @@ -416,8 +418,8 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): import matplotlib.pyplot as plt fig = plt.figure(figsize=(8, 16)) ax = plt.gca() - ax.plot(np.arange(peak_idx-100, peak_idx+300), - sum_low_res[peak_idx-100:peak_idx+300], + ax.plot(np.arange(peak_idx-300, peak_idx+300), + sum_low_res[peak_idx-300:peak_idx+300], c="b", label="Data") ax.set(title="", @@ -541,6 +543,11 @@ class Model(TransformerMixin, BaseEstimator): #self.fit_model = FitModel() self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-4, verbose=True)) + self.channel_pca_model = {channel: Pipeline([('pca', PCA(n_pca_lr, whiten=True)), + ('unc', UncertaintyHolder()), + ]) + for channel in channels} + # size of the test subset self.validation_size = validation_size @@ -614,8 +621,40 @@ class Model(TransformerMixin, BaseEstimator): low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) self.x_model['unc'].set_uncertainty(low_pca_unc) + # for consistency check per channel + selection_model = self.x_model['select'] + low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True) + for channel in self.get_channels(): + pca_model = self.channel_pca_model[channel].named_steps["pca"] + low_pca = pca_model.transform(low_res) + low_pca_rec = pca_model.inverse_transform(low_pca) + low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) + self.channel_pca_model[channel]['unc'].set_uncertainty(low_pca_unc) + return high_res + def check_compatibility_per_channel(self, low_res_data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + """ + Check if a new low-resolution data source is compatible with the one used in training, by + comparing the effect of the trained PCA model on it, but do it per channel. + + Args: + low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel). + + Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model per channel. + """ + selection_model = self.x_model['select'] + low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True) + pca_model = self.channel_pca_model.named_steps['pca'] + quality = {channel: 0.0 for channel in low_res.keys()} + for channel in low_res.keys(): + low_pca = pca_model.transform(low_res[channel]) + low_pca_rec = pca_model.inverse_transform(low_pca) + low_pca_unc = self.channel_pca_model.named_steps['unc'].uncertainty() + low_pca_dev = np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True)) + quality[channel] = low_pca_dev/low_pca_unc + return quality + def check_compatibility(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray: """ Check if a new low-resolution data source is compatible with the one used in training, by