diff --git a/pes_to_spec/__init__.py b/pes_to_spec/__init__.py index e42e7bdca220e7075fad74fc92f4aa507a0f76a6..9656fa30cb220100277700f0a41479a478cb4c82 100644 --- a/pes_to_spec/__init__.py +++ b/pes_to_spec/__init__.py @@ -2,4 +2,4 @@ Estimate high-resolution photon spectrometer data from low-resolution non-invasive measurements. """ -VERSION = "0.0.2" +VERSION = "0.0.3" diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 613bd3a41bc93b79377cd3b7cb2b57185aedeb7e..891637ef43121b40213b97a3ba48a449cbf76afc 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -22,6 +22,7 @@ from sklearn.base import clone, MetaEstimatorMixin from joblib import Parallel, delayed from functools import partial import multiprocessing as mp +from copy import deepcopy from typing import Any, Dict, List, Optional, Union, Tuple @@ -323,6 +324,62 @@ class UncertaintyHolder(TransformerMixin, BaseEstimator): def uncertainty(self): """The uncertainty recorded.""" return self.unc + +class DataHolder(TransformerMixin, BaseEstimator): + """ + Keep track of relevance dictionaries. + + """ + def __init__(self, data: Dict[str, Any]=dict()): + self.data: Dict[str, Any] = data + + def set_data(self, data: Dict[str, Any]): + """ + Set. + + Args: + data: Data. + """ + self.data = deepcopy(data) + + def get_data(self) -> Dict[str, Any]: + """ + Get. + + """ + return self.data + + def fit(self, X, y=None) -> TransformerMixin: + """ + Does nothing. + + Args: + X: Irrelevant. + y: Irrelevant. + + Returns: Itself. + """ + return self + + def transform(self, X: np.ndarray) -> np.ndarray: + """ + Identity map. + + Args: + X: The input. + """ + return X + + def inverse_transform(self, X: np.ndarray) -> np.ndarray: + """ + Identity map. + + Args: + X: The input. + """ + return X + + class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): """ @@ -545,10 +602,8 @@ class Model(TransformerMixin, BaseEstimator): #self.fit_model = FitModel() self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-4, verbose=True)) - self.channel_pca_model = {channel: Pipeline([('pca', PCA(n_pca_lr, whiten=True)), - ('unc', UncertaintyHolder()), - ]) - for channel in channels} + self.channel_mean = {ch: np.nan for ch in channels} + self.channel_relevance = {ch: np.nan for ch in channels} # size of the test subset self.validation_size = validation_size @@ -629,20 +684,28 @@ class Model(TransformerMixin, BaseEstimator): self.x_model['unc'].set_uncertainty(low_pca_unc) # for consistency check per channel - print("Calculate PCA per channel on low-resolution data.") selection_model = self.x_model['select'] - low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True) for channel in self.get_channels(): - print(f"Calculate PCA on {channel}") - low_pca = self.channel_pca_model[channel].named_steps["pca"].fit_transform(low_res[channel]) - low_pca_rec = self.channel_pca_model[channel].named_steps["pca"].inverse_transform(low_pca) - low_pca_unc = np.mean(np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) - self.channel_pca_model[channel]['unc'].set_uncertainty(low_pca_unc) + self.channel_mean[channel] = np.mean(low_res_data[channel], axis=0, keepdims=True) + print(f"Calculate PCA relevance on {channel}") + # freeze input data in one channel only + low_res_data_frozen = {ch: low_res_data[ch] if ch != channel + else np.repeat(self.channel_mean[channel], low_res_data[ch].shape[0], axis=0) + for ch in self.get_channels()} + low_res = selection_model.transform(low_res_data_frozen) + low_pca = pca_model.fit_transform(low_res) + low_pca_rec = pca_model.inverse_transform(low_pca) + low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) + self.channel_relevance[channel] = low_pca_unc print("End of fit.") return high_res - def get_channel_quality(self, channel: str, low_res: Dict[str, np.ndarray], channel_pca_model: Dict[str, Pipeline]) -> float: + def get_channel_quality(self, channel: str, low_res_data: Dict[str, np.ndarray], + pca_model: PCA, + channel_relevance: Dict[str, float], + selection_model: SelectRelevantLowResolution, + channel_mean: Dict[str, np.ndarray]) -> float: """ Get the compatibility for a single channel. @@ -653,11 +716,15 @@ class Model(TransformerMixin, BaseEstimator): Returns: the compatibility factor. """ - pca_model = channel_pca_model[channel].named_steps['pca'] - low_pca = pca_model.transform(low_res[channel]) + # freeze input data in one channel only + low_res_data_frozen = {ch: low_res_data[ch] if ch != channel + else np.repeat(channel_mean[channel], low_res_data[ch].shape[0], axis=0) + for ch in low_res_data.keys()} + low_res_selected = selection_model.transform(low_res_data_frozen) + low_pca = pca_model.transform(low_res_selected) low_pca_rec = pca_model.inverse_transform(low_pca) - low_pca_unc = channel_pca_model[channel].named_steps['unc'].uncertainty() - low_pca_dev = np.sqrt(np.mean((low_res[channel] - low_pca_rec)**2, axis=1, keepdims=True)) + low_pca_unc = channel_relevance[channel] + low_pca_dev = np.sqrt(np.mean((low_res_selected - low_pca_rec)**2, axis=1, keepdims=True)) return low_pca_dev/low_pca_unc def check_compatibility_per_channel(self, low_res_data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: @@ -671,12 +738,19 @@ class Model(TransformerMixin, BaseEstimator): Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model per channel. """ selection_model = self.x_model['select'] - low_res = selection_model.transform(low_res_data, keep_dictionary_structure=True) - quality = {channel: 0.0 for channel in low_res.keys()} - channels = list(low_res.keys()) - #with mp.Pool(len(low_res.keys())) as p: - # values = p.map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels) - values = map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels) + quality = {channel: 0.0 for channel in low_res_data.keys()} + channels = list(low_res_data.keys()) + pca_model = self.x_model['pca'] + if 'fex' in self.x_model.named_steps: + pca_model = self.x_model['fex'].named_steps['prepca'] + #with mp.Pool(len(low_res_data.keys())) as p: + values = map(partial(self.get_channel_quality, + low_res_data=low_res_data, + pca_model=pca_model, + channel_relevance=self.channel_relevance, + selection_model=selection_model, + channel_mean=self.channel_mean + ), channels) quality = dict(zip(channels, values)) return quality @@ -754,7 +828,8 @@ class Model(TransformerMixin, BaseEstimator): joblib.dump([self.x_model, self.y_model, self.fit_model, - self.channel_pca_model + DataHolder(self.channel_mean), + DataHolder(self.channel_relevance) ], filename, compress='zlib') @staticmethod @@ -767,11 +842,12 @@ class Model(TransformerMixin, BaseEstimator): Returns: A new model object. """ - x_model, y_model, fit_model, channel_pca_model = joblib.load(filename) + x_model, y_model, fit_model, channel_mean, channel_relevance = joblib.load(filename) obj = Model() obj.x_model = x_model obj.y_model = y_model obj.fit_model = fit_model - obj.channel_pca_model = channel_pca_model + obj.channel_mean = channel_mean.get_data() + obj.channel_relevance = channel_relevance.get_data() return obj diff --git a/pes_to_spec/test/offline_analysis.py b/pes_to_spec/test/offline_analysis.py index 782aab6a66bde9ed02b9c56d93ea76879cfcd55f..6db5415c6def669ebc50cc2adfe708929e0d2372 100755 --- a/pes_to_spec/test/offline_analysis.py +++ b/pes_to_spec/test/offline_analysis.py @@ -198,6 +198,8 @@ def main(): start = time_ns() rmse = model.check_compatibility(pes_raw_t) print("Consistency check RMSE ratios:", rmse) + rmse = model.check_compatibility_per_channel(pes_raw_t) + print("Consistency per channel check RMSE ratios:", rmse) t += [time_ns() - start] t_names += ["Consistency"]