Skip to content
Snippets Groups Projects

Switched to ARDRegression to keep the code more maintainable

Merged Danilo Enoque Ferreira de Lima requested to merge incrementalPCA into main
Files
2
+ 162
154
@@ -3,8 +3,6 @@ from __future__ import annotations
import joblib
import numpy as np
from autograd import numpy as anp
from autograd import grad
from scipy.signal import fftconvolve
from scipy.signal import find_peaks_cwt
from scipy.optimize import fmin_l_bfgs_b
@@ -25,6 +23,168 @@ from joblib import Parallel, delayed
from typing import Any, Dict, List, Optional, Union, Tuple
# previous method with a single matrix, but using gradient descent
# ARDRegression to be used instead
# advantages:
# * parallelization
# * using evidence maximization as an iterative method
# * automatic relevance determination to reduce overtraining
#
#from autograd import numpy as anp
#from autograd import grad
#class FitModel(RegressorMixin, BaseEstimator):
# """
# Linear regression model with uncertainties.
#
# Args:
# l: Regularization coefficient.
# """
# def __init__(self, l: float=1e-6):
# self.l = l
#
# # model parameter sizes
# self.Nx: int = 0
# self.Ny: int = 0
#
# # fit result
# self.A_inf: Optional[np.ndarray] = None
# self.b_inf: Optional[np.ndarray] = None
# self.u_inf: Optional[np.ndarray] = None
#
# # fit monitoring
# self.loss_train: List[float] = list()
# self.loss_test: List[float] = list()
#
# self.input_data = None
#
# def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
# """
# Perform the fit and evaluate uncertainties with the test set.
#
# Args:
# X: The input.
# y: The target.
# fit_params: If it contains X_test and y_test, they are used to validate the model.
#
# Returns: The object itself.
# """
# if 'X_test' in fit_params and 'y_test' in fit_params:
# X_test = fit_params['X_test']
# y_test = fit_params['y_test']
# else:
# X_test = X
# y_test = y
#
# # model parameter sizes
# self.Nx: int = int(X.shape[1])
# self.Ny: int = int(y.shape[1])
#
# # initial parameter values
# A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
# b0: np.ndarray = np.zeros(self.Ny)
# Aeps: np.ndarray = np.zeros(self.Nx)
# u0: np.ndarray = np.zeros(self.Ny)
# x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
#
# # reset loss monitoring
# self.loss_train: List[float] = list()
# self.loss_test: List[float] = list()
#
# def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
# """
# Calculate the loss function value for a given parameter set `x`.
#
# Args:
# x: The parameters as a flat array.
# X: The independent-variable dataset.
# Y: The dependent-variable dataset.
#
# Returns: The loss.
# """
# # diag( (in @ x - out) @ (in @ x - out)^T )
# A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
# b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
#
# b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
# A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
# log_unc = anp.matmul(X, A_eps) + b_eps
#
# #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
# iunc2 = anp.exp(-2*log_unc)
#
# L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
# weights2 = (anp.sum(anp.square(A.ravel()))
# #+ anp.sum(anp.square(b.ravel()))
# #+ anp.sum(anp.square(A_eps.ravel()))
# #+ anp.sum(anp.square(b_eps.ravel()))
# )
# return L + self.l/2 * weights2
#
# def loss_history(x: np.ndarray) -> float:
# """
# Calculate the loss function and keep a history of it in training and testing.
#
# Args:
# x: Parameters flattened out.
#
# Returns: The loss value.
# """
# l_train = loss(x, X, y)
# l_test = loss(x, X_test, y_test)
#
# self.loss_train += [l_train]
# self.loss_test += [l_test]
# return l_train
#
# def loss_train(x: np.ndarray) -> float:
# """
# Calculate the loss function for the training dataset.
#
# Args:
# x: Parameters flattened out.
#
# Returns: The loss value.
# """
# l_train = loss(x, X, y)
# return l_train
#
# grad_loss = grad(loss_train)
# sc_op = fmin_l_bfgs_b(loss_history,
# x0,
# grad_loss,
# disp=True,
# factr=1e7,
# maxiter=100,
# iprint=0)
#
# # Inference
# self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
# self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
# self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
# self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
#
# def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
# """
# Predict y from X.
#
# Args:
# X: Input dataset.
#
# Returns: Predicted Y and, if return_std is True, also its uncertainty.
# """
#
# # result
# y = (X @ self.A_inf + self.b_inf)
# if not return_std:
# return y
# # flat uncertainty
# y_unc = self.u_inf[0,:]
# # input-dependent uncertainty
# y_eps = np.exp(X @ self.A_eps + y_unc)
# return y, y_eps
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
"""Returns list of train IDs common to sets a, b and c."""
unique_ids = list(set(a).intersection(b).intersection(c))
@@ -272,158 +432,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
plt.savefig(filename)
plt.close(fig)
class FitModel(RegressorMixin, BaseEstimator):
"""
Linear regression model with uncertainties.
Args:
l: Regularization coefficient.
"""
def __init__(self, l: float=1e-6):
self.l = l
# model parameter sizes
self.Nx: int = 0
self.Ny: int = 0
# fit result
self.A_inf: Optional[np.ndarray] = None
self.b_inf: Optional[np.ndarray] = None
self.u_inf: Optional[np.ndarray] = None
# fit monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
self.input_data = None
def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
"""
Perform the fit and evaluate uncertainties with the test set.
Args:
X: The input.
y: The target.
fit_params: If it contains X_test and y_test, they are used to validate the model.
Returns: The object itself.
"""
if 'X_test' in fit_params and 'y_test' in fit_params:
X_test = fit_params['X_test']
y_test = fit_params['y_test']
else:
X_test = X
y_test = y
# model parameter sizes
self.Nx: int = int(X.shape[1])
self.Ny: int = int(y.shape[1])
# initial parameter values
A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
b0: np.ndarray = np.zeros(self.Ny)
Aeps: np.ndarray = np.zeros(self.Nx)
u0: np.ndarray = np.zeros(self.Ny)
x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
# reset loss monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
"""
Calculate the loss function value for a given parameter set `x`.
Args:
x: The parameters as a flat array.
X: The independent-variable dataset.
Y: The dependent-variable dataset.
Returns: The loss.
"""
# diag( (in @ x - out) @ (in @ x - out)^T )
A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
log_unc = anp.matmul(X, A_eps) + b_eps
#log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
iunc2 = anp.exp(-2*log_unc)
L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
weights2 = (anp.sum(anp.square(A.ravel()))
#+ anp.sum(anp.square(b.ravel()))
#+ anp.sum(anp.square(A_eps.ravel()))
#+ anp.sum(anp.square(b_eps.ravel()))
)
return L + self.l/2 * weights2
def loss_history(x: np.ndarray) -> float:
"""
Calculate the loss function and keep a history of it in training and testing.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
l_test = loss(x, X_test, y_test)
self.loss_train += [l_train]
self.loss_test += [l_test]
return l_train
def loss_train(x: np.ndarray) -> float:
"""
Calculate the loss function for the training dataset.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
return l_train
grad_loss = grad(loss_train)
sc_op = fmin_l_bfgs_b(loss_history,
x0,
grad_loss,
disp=True,
factr=1e7,
maxiter=100,
iprint=0)
# Inference
self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
"""
Predict y from X.
Args:
X: Input dataset.
Returns: Predicted Y and, if return_std is True, also its uncertainty.
"""
# result
y = (X @ self.A_inf + self.b_inf)
if not return_std:
return y
# flat uncertainty
y_unc = self.u_inf[0,:]
# input-dependent uncertainty
y_eps = np.exp(X @ self.A_eps + y_unc)
return y, y_eps
def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray):
estimator = clone(estimator)
estimator.fit(X, y)
Loading