From 74dbe4b5f9ca92fbf15f2b976a48eb6573161601 Mon Sep 17 00:00:00 2001 From: Danilo Ferreira de Lima <danilo.enoque.ferreira.de.lima@xfel.de> Date: Wed, 1 Mar 2023 16:02:24 +0100 Subject: [PATCH] Separated deconvolution. Created simplified model for the outlier test. --- pes_to_spec/__init__.py | 2 +- pes_to_spec/model.py | 121 +++++++++++++++++++++++---- pes_to_spec/test/offline_analysis.py | 6 +- 3 files changed, 111 insertions(+), 18 deletions(-) diff --git a/pes_to_spec/__init__.py b/pes_to_spec/__init__.py index d6daec4..0494dc2 100644 --- a/pes_to_spec/__init__.py +++ b/pes_to_spec/__init__.py @@ -2,4 +2,4 @@ Estimate high-resolution photon spectrometer data from low-resolution non-invasive measurements. """ -VERSION = "0.1.9" +VERSION = "0.2.0" diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 5536d2a..0408fc7 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -9,13 +9,13 @@ from scipy.optimize import fmin_l_bfgs_b from sklearn.decomposition import PCA from sklearn.base import TransformerMixin, BaseEstimator from sklearn.base import RegressorMixin +from sklearn.base import OutlierMixin from sklearn.pipeline import Pipeline from sklearn.kernel_approximation import Nystroem #from sklearn.linear_model import ARDRegression from sklearn.linear_model import BayesianRidge -from scipy.stats import gaussian_kde -#from sklearn.ensemble import IsolationForest from sklearn.covariance import EllipticEnvelope +from scipy.stats import gaussian_kde from functools import reduce from itertools import product @@ -647,6 +647,84 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator): return np.asarray(y).T + +class UncorrelatedDeviation(OutlierMixin, BaseEstimator): + """ + Detect outliers from uncorrelated whitened mean-centered inputs. + """ + def __init__(self, contamination: float=0.0000005): + super().__init__() + self.contamination = contamination + + def fit(self, X, y=None) -> OutlierMixin: + """ + Does nothing. + + Args: + X: Irrelevant. + y: Irrelevant. + + Returns: Itself. + """ + self.dist_ = self.score_samples(X) + self.offset_ = -np.percentile(-self.dist_, 100.0 * self.contamination) + return self + + def decision_function(self, X: np.ndarray) -> np.ndarray: + """ + Return the decision function. + """ + return self.offset_ - self.score_samples(X) + + def score_samples(self, X: np.ndarray) -> np.ndarray: + """ + Return the Mahalanobis distance. + + Args: + X: The new input data. + + Returns: The Mahalanobis distance. + """ + return np.sqrt(np.sum(X**2, axis=1)) + + def predict(self, X): + """ + Predict labels (1 inlier, -1 outlier) of X according to fitted model. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The data matrix. + Returns + ------- + is_inlier : ndarray of shape (n_samples,) + Returns -1 for anomalies/outliers and +1 for inliers. + """ + values = self.decision_function(X) + is_inlier = np.full(values.shape[0], -1, dtype=int) + is_inlier[values >= 0] = 1 + + return is_inlier + + def score(self, X, y, sample_weight=None): + """Return the mean accuracy on the given test data and labels. + In multi-label classification, this is the subset accuracy + which is a harsh metric since you require for each sample that + each label set be correctly predicted. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Test samples. + y : array-like of shape (n_samples,) or (n_samples, n_outputs) + True labels for X. + sample_weight : array-like of shape (n_samples,), default=None + Sample weights. + Returns + ------- + score : float + Mean accuracy of self.predict(X) w.r.t. y. + """ + return accuracy_score(y, self.predict(X), sample_weight=sample_weight) + class Model(TransformerMixin, BaseEstimator): """ Object representing a previous fit of the model to be used to predict high-resolution @@ -703,7 +781,9 @@ class Model(TransformerMixin, BaseEstimator): ]) #self.ood = {ch: IsolationForest(n_jobs=-1) # for ch in channels+['full']} - self.ood = {ch: EllipticEnvelope() + #self.ood = {ch: UncorrelatedDeviation(contamination=0.003) + # for ch in channels+['full']} + self.ood = {ch: EllipticEnvelope(contamination=0.003) for ch in channels+['full']} #self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=300, tol=1e-8, verbose=True), n_jobs=8) self.fit_model = MultiOutputWithStd(BayesianRidge(n_iter=300, tol=1e-8, verbose=True), n_jobs=8) @@ -838,11 +918,11 @@ class Model(TransformerMixin, BaseEstimator): y_pred, n = self.fit_model.predict(x_t[inliers], return_std=True) z = self.y_model['pca'].inverse_transform(y_pred) - #n = np.sqrt((self.y_model['pca'].inverse_transform(y_pred + n) - z)**2 + high_pca_unc**2) + n = np.sqrt((self.y_model['pca'].inverse_transform(y_pred + n) - z)**2 + high_pca_unc**2) e = high_res_photon_energy[0,:] if len(high_res_photon_energy.shape) == 2 else high_res_photon_energy Z = np.fft.fft(z) - #V = np.fft.fft(np.mean(n, axis=0)) + V = np.fft.fft(np.mean(n, axis=0)) de = e[1] - e[0] E = np.fft.fftfreq(len(e), de) @@ -854,7 +934,7 @@ class Model(TransformerMixin, BaseEstimator): gaussian_ft = np.fft.fft(gaussian) H = np.mean(Z/D, axis=0) - N = np.absolute(gaussian_ft)**2 + N = np.absolute(gaussian_ft*V)**2 S = np.mean(np.absolute(D)**2, axis=0) H2 = np.absolute(H)**2 nonzero = np.absolute(H) > 0.2 @@ -954,9 +1034,16 @@ class Model(TransformerMixin, BaseEstimator): channels = list(low_res_data.keys()) # check if each channel is close to the mean low_res_selected = selection_model.transform(low_res_data, keep_dictionary_structure=True) - low_pca = {ch: self.channel_pca[ch].transform(low_res_selected[ch]) - for ch in channels} - return {ch: self.ood[ch].predict(low_pca[ch]) for ch in channels} + + def is_inlier(in_data, ch: str) -> np.ndarray: + data_pca = self.channel_pca[ch].transform(in_data) + return self.ood[ch].predict(data_pca) + + #result = Parallel(n_jobs=-1)( + # delayed(is_inlier)(low_res_selected[ch], ch) for ch in channels + #) + #result = dict(result) + return {ch: is_inlier(low_res_selected[ch], ch) for ch in channels} def check_compatibility(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray: """ @@ -1011,17 +1098,23 @@ class Model(TransformerMixin, BaseEstimator): pca_unc = self.y_model['unc'].uncertainty() total_unc = np.sqrt(pca_unc**2 + unc**2) - M = self.wiener_filter.shape[0] - assert expected.shape[1] == M - deconvolved = np.real(np.absolute(np.fft.ifft(np.fft.fft(expected, axis=1) * self.wiener_filter_ft.reshape(1, -1)))) - return dict(expected=expected, unc=unc, pca=pca_unc, total_unc=total_unc, - deconvolved=deconvolved, ) + def deconvolve(self, expected: np.ndarray) -> np.ndarray: + """ + Return Wiener filter deconvolved spectrum. + + Args: + expected: The predicted spectrum. + + Returns: The Wiener filter-corrected spectrum. + """ + return np.real(np.absolute(np.fft.ifft(np.fft.fft(expected, axis=1) * self.wiener_filter_ft.reshape(1, -1)))) + def save(self, filename: str): """ Save the fit model in a file. diff --git a/pes_to_spec/test/offline_analysis.py b/pes_to_spec/test/offline_analysis.py index 0abf224..46d41e6 100755 --- a/pes_to_spec/test/offline_analysis.py +++ b/pes_to_spec/test/offline_analysis.py @@ -68,7 +68,7 @@ def plot_result(filename: str, pes: Optional[np.ndarray]=None, pes_to_show: Optional[str]="", pes_bin: Optional[np.ndarray]=None, - wiener: Optional[np.ndarray]=None): + ): """ Plot result with uncertainty band. @@ -81,7 +81,6 @@ def plot_result(filename: str, pes: PES spectrum for the inset. pes_to_show: Name of the channel shown. pes_bin: PES bins. - wiener: A Wiener filter to use to improve the filter estimate. """ fig = plt.figure(figsize=(12, 8)) @@ -283,6 +282,7 @@ def main(): print("Predict") start = time_ns() spec_pred = model.predict(pes_raw) + spec_pred["deconvolved"] = model.deconvolve(spec_pred["expected"]) t += [time_ns() - start] t_names += ["Predict"] @@ -454,7 +454,7 @@ def main(): #pes=-pes_raw[pes_to_show][idx, first:last], #pes_to_show=pes_to_show.replace('_', ' '), #pes_bin=np.arange(first, last), - wiener=model.wiener_filter + #wiener=model.wiener_filter ) for ch in channels: plot_pes(os.path.join(args.directory, f"test_pes_{tid}_{ch}.png"), -- GitLab