From 08453c3c57308f78593d63c7dbafab8d6f0f79a5 Mon Sep 17 00:00:00 2001 From: Danilo Ferreira de Lima <danilo.enoque.ferreira.de.lima@xfel.de> Date: Wed, 21 Dec 2022 14:54:46 +0100 Subject: [PATCH] Clean up. --- pes_to_spec/model.py | 49 +++++++++++++--------------- pes_to_spec/test/offline_analysis.py | 21 +++++++----- 2 files changed, 34 insertions(+), 36 deletions(-) diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 767c575..67fd2d2 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -9,6 +9,7 @@ from sklearn.decomposition import PCA, IncrementalPCA from sklearn.model_selection import train_test_split from sklearn.base import TransformerMixin, BaseEstimator from itertools import product +from time import time_ns import matplotlib.pyplot as plt @@ -142,7 +143,7 @@ class Model(TransformerMixin, BaseEstimator): self.hr_pca = PCA(n_pca_hr, whiten=True) # PCA unc. in high resolution - self.high_pca_unc: Optional[np.ndarray] = None + self.high_pca_unc: np.ndarray = np.zeros((1, 0), dtype=float) # fit model self.fit_model = FitModel() @@ -296,16 +297,22 @@ class Model(TransformerMixin, BaseEstimator): low_pca = self.lr_pca.fit_transform(low_res) high_pca = self.hr_pca.fit_transform(high_res) # split in train and test for PCA uncertainty evaluation - low_pca_train, low_pca_test, high_pca_train, high_pca_test = train_test_split(low_pca, high_pca, test_size=self.validation_size, random_state=42) + (low_pca_train, low_pca_test, + high_pca_train, high_pca_test) = train_test_split(low_pca, high_pca, + test_size=self.validation_size, + random_state=42) # fit the linear model - self.fit_model.fit(low_pca_train, high_pca_train, low_pca_test, high_pca_test) + self.fit_model.fit(low_pca_train, + high_pca_train, + low_pca_test, + high_pca_test) high_pca_rec = self.hr_pca.inverse_transform(high_pca) self.high_pca_unc = np.sqrt(np.mean((high_res - high_pca_rec)**2, axis=0, keepdims=True)) return high_res - def predict(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray: + def predict(self, low_res_data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: """ Predict a high-resolution spectrum from a low resolution given one. The output includes the uncertainty in its second and third entries of the first dimension. @@ -313,25 +320,22 @@ class Model(TransformerMixin, BaseEstimator): Args: low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel). - Returns: High resolution data with shape (train_id, ToF channel, 3). - The component 0 of the last dimension is the predicted spectrum. - Components 1 and 2 correspond to two sources of uncertainty. + Returns: High resolution data with shape (train_id, energy channel) in a dictionary containing + the expected prediction in key "expected", the stat. uncertainty in key "unc" and + a (1, energy channel) array for the PCA syst. uncertainty in key "pca". """ low_res = self.preprocess_low_res(low_res_data) low_pca = self.lr_pca.transform(low_res) - n_trains = low_res.shape[0] # Get high res. high_pca = self.fit_model.predict(low_pca) - high_res_predicted = self.hr_pca.inverse_transform(high_pca["Y"]) - n_high_res_features = high_res_predicted.shape[1] - high_res_unc = (self.hr_pca.inverse_transform(high_pca["Y"] + high_pca["Y_eps"]) - - high_res_predicted) - result = np.stack((high_res_predicted, - high_res_unc, - np.broadcast_to(self.high_pca_unc, - (n_trains, n_high_res_features))), - axis=2) - return result + n_trains = low_pca.shape[0] + pca_y = np.concatenate((high_pca["Y"], high_pca["Y"] + high_pca["Y_eps"]), axis=0) + high_res_predicted = self.hr_pca.inverse_transform(pca_y) + expected = high_res_predicted[:n_trains, :] + unc = high_res_predicted[n_trains:, :] - expected + return dict(expected=expected, + unc=unc, + pca=self.high_pca_unc) def save(self, filename: str): """ @@ -557,14 +561,5 @@ class FitModel(object): result["Y_unc"] = self.u_inf[0,:] # input-dependent uncertainty result["Y_eps"] = np.exp(X @ self.A_eps + result["Y_unc"]) - - #self.result["res"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"]) # transform PCA space to real space - #self.result["res_unc"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] + self.model["u_inf"]) - self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] ) - #self.result["res_unc"] = np.fabs(self.result["res_unc"]) - #self.result["res_eps"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] + self.result["res_pca_eps"]) - self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] ) - #self.result["res_eps"] = np.fabs(self.result["res_eps"]) - #self.Yhat_pca = self.model["spec_pca_model"].inverse_transform(self.model["Y_test"]) - #self.result["res_unc_specpca"] = np.sqrt(((self.Yhat_pca - self.model["spec_target"])**2).mean(axis=0)) - #self.result["res_unc_total"] = np.sqrt(self.result["res_eps"]**2 + self.result["res_unc_specpca"]**2) return result diff --git a/pes_to_spec/test/offline_analysis.py b/pes_to_spec/test/offline_analysis.py index 2a165ce..848ca88 100755 --- a/pes_to_spec/test/offline_analysis.py +++ b/pes_to_spec/test/offline_analysis.py @@ -15,7 +15,7 @@ matplotlib.use('Agg') import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec -from typing import Optional +from typing import Dict, Optional from time import time_ns import pandas as pd @@ -40,13 +40,13 @@ def plot_pes(filename: str, pes_raw_int: np.ndarray): fig.savefig(filename) plt.close(fig) -def plot_result(filename: str, spec_pred: np.ndarray, spec_smooth: np.ndarray, spec_raw_pe: np.ndarray, spec_raw_int: Optional[np.ndarray]=None): +def plot_result(filename: str, spec_pred: Dict[str, np.ndarray], spec_smooth: np.ndarray, spec_raw_pe: np.ndarray, spec_raw_int: Optional[np.ndarray]=None): """ Plot result with uncertainty band. Args: filename: Output file name. - spec_pred: Predicted result with uncertainty bands in a shape of (3, features). + spec_pred: Predicted result with uncertainty bands in a dictionary. spec_smooth: Smoothened expected result with shape (features,). spec_raw_pe: x axis with the photon energy in eV. spec_raw_int: Original true expected result with shape (features,). @@ -55,15 +55,16 @@ def plot_result(filename: str, spec_pred: np.ndarray, spec_smooth: np.ndarray, s fig = plt.figure(figsize=(16, 8)) gs = GridSpec(1, 1) ax = fig.add_subplot(gs[0, 0]) - eps = np.mean(spec_pred[:, 1]) + unc_stat = np.mean(spec_pred["unc"]) + unc_pca = np.mean(spec_pred["pca"]) ax.plot(spec_raw_pe, spec_smooth, c='b', lw=3, label="High-resolution measurement (smoothened)") - ax.plot(spec_raw_pe, spec_pred[:, 0], c='r', lw=3, label="High-resolution prediction") - ax.fill_between(spec_raw_pe, spec_pred[:, 0] - spec_pred[:, 1], spec_pred[:, 0] + spec_pred[:, 1], facecolor='red', alpha=0.6, label="68% unc. (stat.)") - ax.fill_between(spec_raw_pe, spec_pred[:, 0] - spec_pred[:, 2], spec_pred[:, 0] + spec_pred[:, 2], facecolor='magenta', alpha=0.6, label="68% unc. (syst., PCA)") + ax.plot(spec_raw_pe, spec_pred["expected"], c='r', lw=3, label="High-resolution prediction") + ax.fill_between(spec_raw_pe, spec_pred["expected"] - spec_pred["unc"], spec_pred["expected"] + spec_pred["unc"], facecolor='red', alpha=0.6, label="68% unc. (stat.)") + ax.fill_between(spec_raw_pe, spec_pred["expected"] - spec_pred["pca"], spec_pred["expected"] + spec_pred["pca"], facecolor='magenta', alpha=0.6, label="68% unc. (syst., PCA)") if spec_raw_int is not None: ax.plot(spec_raw_pe, spec_raw_int, c='b', lw=1, ls='--', label="High-resolution measurement") ax.legend() - ax.set(title=f"avg(unc) = {eps}", + ax.set(title=f"avg(stat unc) = {unc_stat}, avg(pca unc) = {unc_pca}", xlabel="Photon energy [eV]", ylabel="Intensity") fig.savefig(filename) @@ -167,7 +168,9 @@ def main(): for tid in test_tids: idx = np.where(tid==tids)[0][0] plot_result(f"test_{tid}.png", - spec_pred[idx, :, :], + {k: item[idx, ...] if k != "pca" + else item[0, ...] + for k, item in spec_pred.items()}, spec_smooth[idx, :], spec_raw_pe[idx, :], spec_raw_int[idx, :]) -- GitLab