From 8b96c8bbf0732a9383180e6101f900cdad489f48 Mon Sep 17 00:00:00 2001 From: Danilo Ferreira de Lima <danilo.enoque.ferreira.de.lima@xfel.de> Date: Tue, 24 Jan 2023 15:54:05 +0100 Subject: [PATCH] Commented out older model to remove the autograd dependency. --- pes_to_spec/model.py | 316 ++++++++++++++++++++++--------------------- pyproject.toml | 4 +- 2 files changed, 164 insertions(+), 156 deletions(-) diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 692c7ea..01890d1 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -3,8 +3,6 @@ from __future__ import annotations import joblib import numpy as np -from autograd import numpy as anp -from autograd import grad from scipy.signal import fftconvolve from scipy.signal import find_peaks_cwt from scipy.optimize import fmin_l_bfgs_b @@ -25,6 +23,168 @@ from joblib import Parallel, delayed from typing import Any, Dict, List, Optional, Union, Tuple +# previous method with a single matrix, but using gradient descent +# ARDRegression to be used instead +# advantages: +# * parallelization +# * using evidence maximization as an iterative method +# * automatic relevance determination to reduce overtraining +# +#from autograd import numpy as anp +#from autograd import grad +#class FitModel(RegressorMixin, BaseEstimator): +# """ +# Linear regression model with uncertainties. +# +# Args: +# l: Regularization coefficient. +# """ +# def __init__(self, l: float=1e-6): +# self.l = l +# +# # model parameter sizes +# self.Nx: int = 0 +# self.Ny: int = 0 +# +# # fit result +# self.A_inf: Optional[np.ndarray] = None +# self.b_inf: Optional[np.ndarray] = None +# self.u_inf: Optional[np.ndarray] = None +# +# # fit monitoring +# self.loss_train: List[float] = list() +# self.loss_test: List[float] = list() +# +# self.input_data = None +# +# def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin: +# """ +# Perform the fit and evaluate uncertainties with the test set. +# +# Args: +# X: The input. +# y: The target. +# fit_params: If it contains X_test and y_test, they are used to validate the model. +# +# Returns: The object itself. +# """ +# if 'X_test' in fit_params and 'y_test' in fit_params: +# X_test = fit_params['X_test'] +# y_test = fit_params['y_test'] +# else: +# X_test = X +# y_test = y +# +# # model parameter sizes +# self.Nx: int = int(X.shape[1]) +# self.Ny: int = int(y.shape[1]) +# +# # initial parameter values +# A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) +# b0: np.ndarray = np.zeros(self.Ny) +# Aeps: np.ndarray = np.zeros(self.Nx) +# u0: np.ndarray = np.zeros(self.Ny) +# x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) +# +# # reset loss monitoring +# self.loss_train: List[float] = list() +# self.loss_test: List[float] = list() +# +# def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: +# """ +# Calculate the loss function value for a given parameter set `x`. +# +# Args: +# x: The parameters as a flat array. +# X: The independent-variable dataset. +# Y: The dependent-variable dataset. +# +# Returns: The loss. +# """ +# # diag( (in @ x - out) @ (in @ x - out)^T ) +# A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) +# b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) +# +# b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) +# A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) +# log_unc = anp.matmul(X, A_eps) + b_eps +# +# #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) +# iunc2 = anp.exp(-2*log_unc) +# +# L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) +# weights2 = (anp.sum(anp.square(A.ravel())) +# #+ anp.sum(anp.square(b.ravel())) +# #+ anp.sum(anp.square(A_eps.ravel())) +# #+ anp.sum(anp.square(b_eps.ravel())) +# ) +# return L + self.l/2 * weights2 +# +# def loss_history(x: np.ndarray) -> float: +# """ +# Calculate the loss function and keep a history of it in training and testing. +# +# Args: +# x: Parameters flattened out. +# +# Returns: The loss value. +# """ +# l_train = loss(x, X, y) +# l_test = loss(x, X_test, y_test) +# +# self.loss_train += [l_train] +# self.loss_test += [l_test] +# return l_train +# +# def loss_train(x: np.ndarray) -> float: +# """ +# Calculate the loss function for the training dataset. +# +# Args: +# x: Parameters flattened out. +# +# Returns: The loss value. +# """ +# l_train = loss(x, X, y) +# return l_train +# +# grad_loss = grad(loss_train) +# sc_op = fmin_l_bfgs_b(loss_history, +# x0, +# grad_loss, +# disp=True, +# factr=1e7, +# maxiter=100, +# iprint=0) +# +# # Inference +# self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) +# self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) +# self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp +# self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) +# +# def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: +# """ +# Predict y from X. +# +# Args: +# X: Input dataset. +# +# Returns: Predicted Y and, if return_std is True, also its uncertainty. +# """ +# +# # result +# y = (X @ self.A_inf + self.b_inf) +# if not return_std: +# return y +# # flat uncertainty +# y_unc = self.u_inf[0,:] +# # input-dependent uncertainty +# y_eps = np.exp(X @ self.A_eps + y_unc) +# return y, y_eps + + + def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray: """Returns list of train IDs common to sets a, b and c.""" unique_ids = list(set(a).intersection(b).intersection(c)) @@ -272,158 +432,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): plt.savefig(filename) plt.close(fig) -class FitModel(RegressorMixin, BaseEstimator): - """ - Linear regression model with uncertainties. - - Args: - l: Regularization coefficient. - """ - def __init__(self, l: float=1e-6): - self.l = l - - # model parameter sizes - self.Nx: int = 0 - self.Ny: int = 0 - - # fit result - self.A_inf: Optional[np.ndarray] = None - self.b_inf: Optional[np.ndarray] = None - self.u_inf: Optional[np.ndarray] = None - - # fit monitoring - self.loss_train: List[float] = list() - self.loss_test: List[float] = list() - - self.input_data = None - - def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin: - """ - Perform the fit and evaluate uncertainties with the test set. - - Args: - X: The input. - y: The target. - fit_params: If it contains X_test and y_test, they are used to validate the model. - - Returns: The object itself. - """ - if 'X_test' in fit_params and 'y_test' in fit_params: - X_test = fit_params['X_test'] - y_test = fit_params['y_test'] - else: - X_test = X - y_test = y - - # model parameter sizes - self.Nx: int = int(X.shape[1]) - self.Ny: int = int(y.shape[1]) - - # initial parameter values - A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) - b0: np.ndarray = np.zeros(self.Ny) - Aeps: np.ndarray = np.zeros(self.Nx) - u0: np.ndarray = np.zeros(self.Ny) - x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) - - # reset loss monitoring - self.loss_train: List[float] = list() - self.loss_test: List[float] = list() - - def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: - """ - Calculate the loss function value for a given parameter set `x`. - - Args: - x: The parameters as a flat array. - X: The independent-variable dataset. - Y: The dependent-variable dataset. - - Returns: The loss. - """ - # diag( (in @ x - out) @ (in @ x - out)^T ) - A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) - b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) - - b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) - A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) - log_unc = anp.matmul(X, A_eps) + b_eps - - #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) - iunc2 = anp.exp(-2*log_unc) - - L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) - weights2 = (anp.sum(anp.square(A.ravel())) - #+ anp.sum(anp.square(b.ravel())) - #+ anp.sum(anp.square(A_eps.ravel())) - #+ anp.sum(anp.square(b_eps.ravel())) - ) - return L + self.l/2 * weights2 - - def loss_history(x: np.ndarray) -> float: - """ - Calculate the loss function and keep a history of it in training and testing. - - Args: - x: Parameters flattened out. - - Returns: The loss value. - """ - l_train = loss(x, X, y) - l_test = loss(x, X_test, y_test) - - self.loss_train += [l_train] - self.loss_test += [l_test] - return l_train - - def loss_train(x: np.ndarray) -> float: - """ - Calculate the loss function for the training dataset. - - Args: - x: Parameters flattened out. - - Returns: The loss value. - """ - l_train = loss(x, X, y) - return l_train - - grad_loss = grad(loss_train) - sc_op = fmin_l_bfgs_b(loss_history, - x0, - grad_loss, - disp=True, - factr=1e7, - maxiter=100, - iprint=0) - - # Inference - self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) - self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) - self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp - self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) - - def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: - """ - Predict y from X. - - Args: - X: Input dataset. - - Returns: Predicted Y and, if return_std is True, also its uncertainty. - """ - - # result - y = (X @ self.A_inf + self.b_inf) - if not return_std: - return y - # flat uncertainty - y_unc = self.u_inf[0,:] - # input-dependent uncertainty - y_eps = np.exp(X @ self.A_eps + y_unc) - return y, y_eps - - def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray): estimator = clone(estimator) estimator.fit(X, y) diff --git a/pyproject.toml b/pyproject.toml index 5e730a1..e9a6551 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,8 +27,8 @@ dynamic = ["version", "readme"] dependencies = [ "numpy>=1.21", "scipy>=1.6", - "scikit-learn==1.0.2", - "autograd", + "scikit-learn>=1.0.2", + #"autograd", "h5py" ] -- GitLab