From e84aae6a518170e9c36d0555573693d0705c234a Mon Sep 17 00:00:00 2001 From: Danilo Ferreira de Lima <danilo.enoque.ferreira.de.lima@xfel.de> Date: Fri, 17 Feb 2023 18:09:28 +0100 Subject: [PATCH] Use per channel PCA for consistency check. Original fit model has better uncertainties, when compared to ARD regression: switching to it. --- pes_to_spec/model.py | 351 ++++++++++++++++++++++--------------------- pyproject.toml | 2 +- 2 files changed, 177 insertions(+), 176 deletions(-) diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index ba514a8..9bd444a 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -13,6 +13,7 @@ from sklearn.base import TransformerMixin, BaseEstimator from sklearn.base import RegressorMixin from sklearn.kernel_approximation import Nystroem from sklearn.linear_model import ARDRegression +from sklearn.preprocessing import PolynomialFeatures #from sklearn.svm import LinearSVR #from sklearn.gaussian_process import GaussianProcessRegressor #from sklearn.gaussian_process.kernels import DotProduct, WhiteKernel @@ -34,160 +35,156 @@ from typing import Any, Dict, List, Optional, Union, Tuple # * using evidence maximization as an iterative method # * automatic relevance determination to reduce overtraining # -#from autograd import numpy as anp -#from autograd import grad -#class FitModel(RegressorMixin, BaseEstimator): -# """ -# Linear regression model with uncertainties. -# -# Args: -# l: Regularization coefficient. -# """ -# def __init__(self, l: float=1e-6): -# self.l = l -# -# # model parameter sizes -# self.Nx: int = 0 -# self.Ny: int = 0 -# -# # fit result -# self.A_inf: Optional[np.ndarray] = None -# self.b_inf: Optional[np.ndarray] = None -# self.u_inf: Optional[np.ndarray] = None -# -# # fit monitoring -# self.loss_train: List[float] = list() -# self.loss_test: List[float] = list() -# -# self.input_data = None -# -# def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin: -# """ -# Perform the fit and evaluate uncertainties with the test set. -# -# Args: -# X: The input. -# y: The target. -# fit_params: If it contains X_test and y_test, they are used to validate the model. -# -# Returns: The object itself. -# """ -# if 'X_test' in fit_params and 'y_test' in fit_params: -# X_test = fit_params['X_test'] -# y_test = fit_params['y_test'] -# else: -# X_test = X -# y_test = y -# -# # model parameter sizes -# self.Nx: int = int(X.shape[1]) -# self.Ny: int = int(y.shape[1]) -# -# # initial parameter values -# A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) -# b0: np.ndarray = np.zeros(self.Ny) -# Aeps: np.ndarray = np.zeros(self.Nx) -# u0: np.ndarray = np.zeros(self.Ny) -# x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) -# -# # reset loss monitoring -# self.loss_train: List[float] = list() -# self.loss_test: List[float] = list() -# -# def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: -# """ -# Calculate the loss function value for a given parameter set `x`. -# -# Args: -# x: The parameters as a flat array. -# X: The independent-variable dataset. -# Y: The dependent-variable dataset. -# -# Returns: The loss. -# """ -# # diag( (in @ x - out) @ (in @ x - out)^T ) -# A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) -# b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) -# -# b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) -# A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) -# log_unc = anp.matmul(X, A_eps) + b_eps -# -# #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) -# iunc2 = anp.exp(-2*log_unc) -# -# L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) -# weights2 = (anp.sum(anp.square(A.ravel())) -# #+ anp.sum(anp.square(b.ravel())) -# #+ anp.sum(anp.square(A_eps.ravel())) -# #+ anp.sum(anp.square(b_eps.ravel())) -# ) -# return L + self.l/2 * weights2 -# -# def loss_history(x: np.ndarray) -> float: -# """ -# Calculate the loss function and keep a history of it in training and testing. -# -# Args: -# x: Parameters flattened out. -# -# Returns: The loss value. -# """ -# l_train = loss(x, X, y) -# l_test = loss(x, X_test, y_test) -# -# self.loss_train += [l_train] -# self.loss_test += [l_test] -# return l_train -# -# def loss_train(x: np.ndarray) -> float: -# """ -# Calculate the loss function for the training dataset. -# -# Args: -# x: Parameters flattened out. -# -# Returns: The loss value. -# """ -# l_train = loss(x, X, y) -# return l_train -# -# grad_loss = grad(loss_train) -# sc_op = fmin_l_bfgs_b(loss_history, -# x0, -# grad_loss, -# disp=True, -# #factr=1e7, -# factr=10, -# maxiter=100, -# iprint=0) -# -# # Inference -# self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) -# self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) -# self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp -# self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) -# -# def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: -# """ -# Predict y from X. -# -# Args: -# X: Input dataset. -# -# Returns: Predicted Y and, if return_std is True, also its uncertainty. -# """ -# -# # result -# y = (X @ self.A_inf + self.b_inf) -# if not return_std: -# return y -# # flat uncertainty -# y_unc = self.u_inf[0,:] -# # input-dependent uncertainty -# y_eps = np.exp(X @ self.A_eps + y_unc) -# return y, y_eps -# +from autograd import numpy as anp +from autograd import grad +class FitModel(RegressorMixin, BaseEstimator): + """ + Linear regression model with uncertainties. + + Args: + l: Regularization coefficient. + """ + def __init__(self, l: float=1e-6): + self.l = l + + # model parameter sizes + self.Nx: int = 0 + self.Ny: int = 0 + + # fit result + self.A_inf: Optional[np.ndarray] = None + self.b_inf: Optional[np.ndarray] = None + self.u_inf: Optional[np.ndarray] = None + + # fit monitoring + self.loss_train: List[float] = list() + self.loss_test: List[float] = list() + + self.input_data = None + + def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin: + """ + Perform the fit and evaluate uncertainties with the test set. + + Args: + X: The input. + y: The target. + fit_params: If it contains X_test and y_test, they are used to validate the model. + + Returns: The object itself. + """ + if 'X_test' in fit_params and 'y_test' in fit_params: + X_test = fit_params['X_test'] + y_test = fit_params['y_test'] + else: + X_test = X + y_test = y + + # model parameter sizes + self.Nx: int = int(X.shape[1]) + self.Ny: int = int(y.shape[1]) + + # initial parameter values + A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) + b0: np.ndarray = np.zeros(self.Ny) + Aeps: np.ndarray = np.zeros(self.Nx) + u0: np.ndarray = np.zeros(self.Ny) + x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) + + # reset loss monitoring + self.loss_train: List[float] = list() + self.loss_test: List[float] = list() + + def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: + """ + Calculate the loss function value for a given parameter set `x`. + + Args: + x: The parameters as a flat array. + X: The independent-variable dataset. + Y: The dependent-variable dataset. + + Returns: The loss. + """ + # diag( (in @ x - out) @ (in @ x - out)^T ) + A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) + b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) + + b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) + A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) + log_unc = anp.matmul(X, A_eps) + b_eps + + #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) + iunc2 = anp.exp(-2*log_unc) + + L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) + #weights2 = anp.sum(anp.square(A.ravel())) + return L #+ self.l/2 * weights2 + + def loss_history(x: np.ndarray) -> float: + """ + Calculate the loss function and keep a history of it in training and testing. + + Args: + x: Parameters flattened out. + + Returns: The loss value. + """ + l_train = loss(x, X, y) + l_test = loss(x, X_test, y_test) + + self.loss_train += [l_train] + self.loss_test += [l_test] + return l_train + + def loss_train(x: np.ndarray) -> float: + """ + Calculate the loss function for the training dataset. + + Args: + x: Parameters flattened out. + + Returns: The loss value. + """ + l_train = loss(x, X, y) + return l_train + + grad_loss = grad(loss_train) + sc_op = fmin_l_bfgs_b(loss_history, + x0, + grad_loss, + disp=True, + #factr=1e7, + factr=10, + maxiter=100, + iprint=0) + + # Inference + self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) + self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) + self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp + self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) + + def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: + """ + Predict y from X. + + Args: + X: Input dataset. + + Returns: Predicted Y and, if return_std is True, also its uncertainty. + """ + + # result + y = (X @ self.A_inf + self.b_inf) + if not return_std: + return y + # flat uncertainty + y_unc = self.u_inf[0,:] + # input-dependent uncertainty + y_eps = np.exp(X @ self.A_eps + y_unc) + return y, y_eps + def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray: @@ -617,10 +614,14 @@ class Model(TransformerMixin, BaseEstimator): ('pca', PCA(n_pca_hr, whiten=True)), ('unc', UncertaintyHolder()), ]) - #self.fit_model = FitModel() - self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-5, verbose=True)) + self.fit_model = Pipeline([ + #('fit', MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-6, verbose=True), n_jobs=10)), + ('fit', FitModel()), + ]) self.channel_mean = {ch: np.nan for ch in channels} + self.channel_pca = {ch: PCA(n_pca_lr, whiten=True) + for ch in channels} #self.channel_relevance = {ch: np.nan for ch in channels} @@ -705,23 +706,15 @@ class Model(TransformerMixin, BaseEstimator): self.x_model['unc'].set_uncertainty(low_pca_unc) # for consistency check - self.x_pca_mean = np.mean(low_pca, axis=0, keepdims=True) - self.x_pca_std = np.std(low_pca, axis=0, keepdims=True) + self.x_pca_mean = np.mean(low_res - low_pca_rec, axis=0, keepdims=True) + self.x_pca_std = np.std(low_res - low_pca_rec, axis=0, keepdims=True) # for consistency check per channel - #selection_model = self.x_model['select'] - #for channel in self.get_channels(): - # self.channel_mean[channel] = np.mean(low_res_data[channel], axis=0, keepdims=True) - # print(f"Calculate PCA relevance on {channel}") - # # freeze input data in one channel only - # low_res_data_frozen = {ch: low_res_data[ch] if ch != channel - # else np.repeat(self.channel_mean[channel], low_res_data[ch].shape[0], axis=0) - # for ch in self.get_channels()} - # low_res = selection_model.transform(low_res_data_frozen) - # low_pca = pca_model.fit_transform(low_res) - # low_pca_rec = pca_model.inverse_transform(low_pca) - # low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True) - # self.channel_relevance[channel] = low_pca_unc + selection_model = self.x_model['select'] + low_res_selected = selection_model.transform(low_res_data, keep_dictionary_structure=True) + for channel in self.get_channels(): + print(f"Calculate PCA on {channel}") + low_pca = self.channel_pca[channel].fit_transform(low_res_selected[channel]) print("End of fit.") return high_res @@ -766,8 +759,12 @@ class Model(TransformerMixin, BaseEstimator): channels = list(low_res_data.keys()) # check if each channel is close to the mean low_res_selected = selection_model.transform(low_res_data, keep_dictionary_structure=True) - deviation = {ch: (low_res_selected[ch] - selection_model.mean[ch])/selection_model.std[ch] + # decorrelate it + deviation = {ch: self.channel_pca[ch].transform(low_res_selected[ch]) for ch in channels} + # just whiten it + #deviation = {ch: (low_res_selected[ch] - selection_model.mean[ch])/selection_model.std[ch] + # for ch in channels} ndof = {ch: float(deviation[ch].shape[1] - 1) for ch in channels} chi2 = {ch: np.sum(deviation[ch]**2, axis=1, keepdims=True) @@ -809,15 +806,15 @@ class Model(TransformerMixin, BaseEstimator): if 'fex' in self.x_model.named_steps: pca_model = self.x_model['fex'].named_steps['prepca'] low_pca = pca_model.transform(low_res) + low_pca_rec = pca_model.inverse_transform(low_pca) - deviation = (low_pca - self.x_pca_mean)/self.x_pca_std + deviation = (low_pca_rec - low_res - self.x_pca_mean)/self.x_pca_std ndof = float(deviation.shape[1] - 1) chi2 = np.sum(deviation**2, axis=1, keepdims=True) chi2_mu = scipy.stats.chi2.mean(ndof) chi2_sigma = scipy.stats.chi2.std(ndof) return (chi2 - chi2_mu)/chi2_sigma - #low_pca_rec = pca_model.inverse_transform(low_pca) #low_pca_unc = self.x_model['unc'].uncertainty() #fig = plt.figure(figsize=(8, 16)) @@ -879,6 +876,7 @@ class Model(TransformerMixin, BaseEstimator): DataHolder(self.channel_mean), DataHolder(self.x_pca_mean), DataHolder(self.x_pca_std), + self.channel_pca, #DataHolder(self.channel_relevance) ], filename, compress='zlib') @@ -892,7 +890,9 @@ class Model(TransformerMixin, BaseEstimator): Returns: A new model object. """ - x_model, y_model, fit_model, channel_mean, x_pca_mean, x_pca_std = joblib.load(filename) + (x_model, y_model, fit_model, + channel_mean, x_pca_mean, x_pca_std, + channel_pca) = joblib.load(filename) obj = Model() obj.x_model = x_model obj.y_model = y_model @@ -900,6 +900,7 @@ class Model(TransformerMixin, BaseEstimator): obj.channel_mean = channel_mean.get_data() obj.x_pca_mean = x_pca_mean.get_data() obj.x_pca_std = x_pca_std.get_data() + obj.channel_pca = channel_pca #obj.channel_relevance = channel_relevance.get_data() return obj diff --git a/pyproject.toml b/pyproject.toml index 6c239f4..fce19d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ "numpy>=1.21", "scipy>=1.6", "scikit-learn>=1.0.2", - #"autograd", + "autograd", ] [project.optional-dependencies] -- GitLab