diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 01890d1829b4abb67f0481f5a42fc2c9afe8d61f..613bd3a41bc93b79377cd3b7cb2b57185aedeb7e 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -20,6 +20,8 @@ from sklearn.model_selection import train_test_split from sklearn.base import clone, MetaEstimatorMixin from joblib import Parallel, delayed +from functools import partial +import multiprocessing as mp from typing import Any, Dict, List, Optional, Union, Tuple @@ -343,7 +345,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): self.tof_start = tof_start self.delta_tof = delta_tof - def transform(self, X: Dict[str, np.ndarray]) -> np.ndarray: + def transform(self, X: Dict[str, np.ndarray], keep_dictionary_structure: bool=False) -> np.ndarray: """ Get a dictionary with the channel names for the inut low resolution data and output only the relevant input data in an array. @@ -351,18 +353,20 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): Args: X: Dictionary with keys named channel_{i}_{k}, where i is a number between 1 and 4 and k is a letter between A and D. + keep_dictionary_structure: Whether to concatenate all channels, or keep them as a dictionary. Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features). """ if self.tof_start is None: raise NotImplementedError("The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.") - items = [X[k] for k in self.channels] + y = X if self.delta_tof is not None: - items = [item[:, self.tof_start:(self.tof_start + self.delta_tof)] for item in items] - else: - items = [item[:, self.tof_start:] for item in items] - cat = np.concatenate(items, axis=1) - return cat + first = max(0, self.tof_start - self.delta_tof) + last = min(X[self.channels[0]].shape[1], self.tof_start + self.delta_tof) + y = {channel: item[:, first:last] for channel, item in X.items()} + if not keep_dictionary_structure: + return np.concatenate(list(y.values()), axis=1) + return y def estimate_prompt_peak(self, X: Dict[str, np.ndarray]) -> int: """ @@ -416,8 +420,8 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): import matplotlib.pyplot as plt fig = plt.figure(figsize=(8, 16)) ax = plt.gca() - ax.plot(np.arange(peak_idx-100, peak_idx+300), - sum_low_res[peak_idx-100:peak_idx+300], + ax.plot(np.arange(peak_idx-300, peak_idx+300), + sum_low_res[peak_idx-300:peak_idx+300], c="b", label="Data") ax.set(title="", @@ -541,6 +545,11 @@ class Model(TransformerMixin, BaseEstimator): #self.fit_model = FitModel() self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-4, verbose=True)) + self.channel_pca_model = {channel: Pipeline([('pca', PCA(n_pca_lr, whiten=True)), + ('unc', UncertaintyHolder()), + ]) + for channel in channels} + # size of the test subset self.validation_size = validation_size @@ -593,18 +602,23 @@ class Model(TransformerMixin, BaseEstimator): Returns: Smoothened high resolution spectrum. """ + print("Fitting PCA on low-resolution data.") x_t = self.x_model.fit_transform(low_res_data) + print("Fitting PCA on high-resolution data.") y_t = self.y_model.fit_transform(high_res_data, smoothen__energy=high_res_photon_energy) #self.fit_model.set_params(fex__gamma=1.0/float(x_t.shape[0])) + print("Fitting model.") self.fit_model.fit(x_t, y_t) # calculate the effect of the PCA + print("Calculate PCA unc. on high-resolution data.") high_res = self.y_model['smoothen'].transform(high_res_data) high_pca = self.y_model['pca'].transform(high_res) high_pca_rec = self.y_model['pca'].inverse_transform(high_pca) high_pca_unc = np.sqrt(np.mean((high_res - high_pca_rec)**2, axis=0, keepdims=True)) self.y_model['unc'].set_uncertainty(high_pca_unc) + print("Calculate PCA unc. on low-resolution data.") low_res = self.x_model['select'].transform(low_res_data) pca_model = self.x_model['pca'] if 'fex' in self.x_model.named_steps: @@ -614,8 +628,58 @@ class Model(TransformerMixin, BaseEstimator): low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) self.x_model['unc'].set_uncertainty(low_pca_unc) + # for consistency check per channel + print("Calculate PCA per channel on low-resolution data.") + selection_model = self.x_model['select'] + low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True) + for channel in self.get_channels(): + print(f"Calculate PCA on {channel}") + low_pca = self.channel_pca_model[channel].named_steps["pca"].fit_transform(low_res[channel]) + low_pca_rec = self.channel_pca_model[channel].named_steps["pca"].inverse_transform(low_pca) + low_pca_unc = np.mean(np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) + self.channel_pca_model[channel]['unc'].set_uncertainty(low_pca_unc) + print("End of fit.") + return high_res + def get_channel_quality(self, channel: str, low_res: Dict[str, np.ndarray], channel_pca_model: Dict[str, Pipeline]) -> float: + """ + Get the compatibility for a single channel. + + Args: + channel: The channel. + low_res: The data in a dictionary. + pca_model: The PCA model. + + Returns: the compatibility factor. + """ + pca_model = channel_pca_model[channel].named_steps['pca'] + low_pca = pca_model.transform(low_res[channel]) + low_pca_rec = pca_model.inverse_transform(low_pca) + low_pca_unc = channel_pca_model[channel].named_steps['unc'].uncertainty() + low_pca_dev = np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True)) + return low_pca_dev/low_pca_unc + + def check_compatibility_per_channel(self, low_res_data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + """ + Check if a new low-resolution data source is compatible with the one used in training, by + comparing the effect of the trained PCA model on it, but do it per channel. + + Args: + low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel). + + Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model per channel. + """ + selection_model = self.x_model['select'] + low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True) + quality = {channel: 0.0 for channel in low_res.keys()} + channels = list(low_res.keys()) + #with mp.Pool(len(low_res.keys())) as p: + # values = p.map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels) + values = map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels) + quality = dict(zip(channels, values)) + return quality + def check_compatibility(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray: """ Check if a new low-resolution data source is compatible with the one used in training, by @@ -689,8 +753,9 @@ class Model(TransformerMixin, BaseEstimator): """ joblib.dump([self.x_model, self.y_model, - self.fit_model], - filename, compress='zlib') + self.fit_model, + self.channel_pca_model + ], filename, compress='zlib') @staticmethod def load(filename: str) -> Model: @@ -702,10 +767,11 @@ class Model(TransformerMixin, BaseEstimator): Returns: A new model object. """ - x_model, y_model, fit_model = joblib.load(filename) + x_model, y_model, fit_model, channel_pca_model = joblib.load(filename) obj = Model() obj.x_model = x_model obj.y_model = y_model obj.fit_model = fit_model + obj.channel_pca_model = channel_pca_model return obj