Skip to content
Snippets Groups Projects

Switched to ARDRegression to keep the code more maintainable

Merged Danilo Enoque Ferreira de Lima requested to merge incrementalPCA into main
2 files
+ 4
15
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 165
165
@@ -3,12 +3,10 @@ from __future__ import annotations
import joblib
import numpy as np
from autograd import numpy as anp
from autograd import grad
from scipy.signal import fftconvolve
from scipy.signal import find_peaks_cwt
from scipy.optimize import fmin_l_bfgs_b
from sklearn.decomposition import PCA
from sklearn.decomposition import PCA, IncrementalPCA
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.base import RegressorMixin
@@ -23,10 +21,170 @@ from sklearn.model_selection import train_test_split
from sklearn.base import clone, MetaEstimatorMixin
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
from typing import Any, Dict, List, Optional, Union, Tuple
# previous method with a single matrix, but using gradient descent
# ARDRegression to be used instead
# advantages:
# * parallelization
# * using evidence maximization as an iterative method
# * automatic relevance determination to reduce overtraining
#
#from autograd import numpy as anp
#from autograd import grad
#class FitModel(RegressorMixin, BaseEstimator):
# """
# Linear regression model with uncertainties.
#
# Args:
# l: Regularization coefficient.
# """
# def __init__(self, l: float=1e-6):
# self.l = l
#
# # model parameter sizes
# self.Nx: int = 0
# self.Ny: int = 0
#
# # fit result
# self.A_inf: Optional[np.ndarray] = None
# self.b_inf: Optional[np.ndarray] = None
# self.u_inf: Optional[np.ndarray] = None
#
# # fit monitoring
# self.loss_train: List[float] = list()
# self.loss_test: List[float] = list()
#
# self.input_data = None
#
# def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
# """
# Perform the fit and evaluate uncertainties with the test set.
#
# Args:
# X: The input.
# y: The target.
# fit_params: If it contains X_test and y_test, they are used to validate the model.
#
# Returns: The object itself.
# """
# if 'X_test' in fit_params and 'y_test' in fit_params:
# X_test = fit_params['X_test']
# y_test = fit_params['y_test']
# else:
# X_test = X
# y_test = y
#
# # model parameter sizes
# self.Nx: int = int(X.shape[1])
# self.Ny: int = int(y.shape[1])
#
# # initial parameter values
# A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
# b0: np.ndarray = np.zeros(self.Ny)
# Aeps: np.ndarray = np.zeros(self.Nx)
# u0: np.ndarray = np.zeros(self.Ny)
# x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
#
# # reset loss monitoring
# self.loss_train: List[float] = list()
# self.loss_test: List[float] = list()
#
# def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
# """
# Calculate the loss function value for a given parameter set `x`.
#
# Args:
# x: The parameters as a flat array.
# X: The independent-variable dataset.
# Y: The dependent-variable dataset.
#
# Returns: The loss.
# """
# # diag( (in @ x - out) @ (in @ x - out)^T )
# A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
# b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
#
# b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
# A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
# log_unc = anp.matmul(X, A_eps) + b_eps
#
# #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
# iunc2 = anp.exp(-2*log_unc)
#
# L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
# weights2 = (anp.sum(anp.square(A.ravel()))
# #+ anp.sum(anp.square(b.ravel()))
# #+ anp.sum(anp.square(A_eps.ravel()))
# #+ anp.sum(anp.square(b_eps.ravel()))
# )
# return L + self.l/2 * weights2
#
# def loss_history(x: np.ndarray) -> float:
# """
# Calculate the loss function and keep a history of it in training and testing.
#
# Args:
# x: Parameters flattened out.
#
# Returns: The loss value.
# """
# l_train = loss(x, X, y)
# l_test = loss(x, X_test, y_test)
#
# self.loss_train += [l_train]
# self.loss_test += [l_test]
# return l_train
#
# def loss_train(x: np.ndarray) -> float:
# """
# Calculate the loss function for the training dataset.
#
# Args:
# x: Parameters flattened out.
#
# Returns: The loss value.
# """
# l_train = loss(x, X, y)
# return l_train
#
# grad_loss = grad(loss_train)
# sc_op = fmin_l_bfgs_b(loss_history,
# x0,
# grad_loss,
# disp=True,
# factr=1e7,
# maxiter=100,
# iprint=0)
#
# # Inference
# self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
# self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
# self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
# self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
#
# def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
# """
# Predict y from X.
#
# Args:
# X: Input dataset.
#
# Returns: Predicted Y and, if return_std is True, also its uncertainty.
# """
#
# # result
# y = (X @ self.A_inf + self.b_inf)
# if not return_std:
# return y
# # flat uncertainty
# y_unc = self.u_inf[0,:]
# # input-dependent uncertainty
# y_eps = np.exp(X @ self.A_eps + y_unc)
# return y, y_eps
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
"""Returns list of train IDs common to sets a, b and c."""
unique_ids = list(set(a).intersection(b).intersection(c))
@@ -71,7 +229,6 @@ class HighResolutionSmoother(TransformerMixin, BaseEstimator):
Returns: The object itself.
"""
print("Storing high resolution energy")
self.energy = np.copy(fit_params["energy"])
if len(self.energy.shape) == 2:
self.energy = self.energy[0,:]
@@ -86,7 +243,6 @@ class HighResolutionSmoother(TransformerMixin, BaseEstimator):
Returns: Smoothened out spectrum.
"""
print("Smoothing high-resolution spectrum")
if self.high_res_sigma <= 0:
return X
# use a default energy axis is none is given
@@ -198,7 +354,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features).
"""
print("Selecting area close to the peak")
if self.tof_start is None:
raise NotImplementedError("The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.")
items = [X[k] for k in self.channels]
@@ -244,7 +399,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
Returns: The object itself.
"""
print("Estimating peak position")
self.tof_start = self.estimate_prompt_peak(X)
return self
@@ -259,6 +413,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
"""
sum_low_res = - np.mean(sum(list(X.values())), axis=0)
peak_idx = self.estimate_prompt_peak(X)
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(8, 16))
ax = plt.gca()
ax.plot(np.arange(peak_idx-100, peak_idx+300),
@@ -277,158 +432,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
plt.savefig(filename)
plt.close(fig)
class FitModel(RegressorMixin, BaseEstimator):
"""
Linear regression model with uncertainties.
Args:
l: Regularization coefficient.
"""
def __init__(self, l: float=1e-6):
self.l = l
# model parameter sizes
self.Nx: int = 0
self.Ny: int = 0
# fit result
self.A_inf: Optional[np.ndarray] = None
self.b_inf: Optional[np.ndarray] = None
self.u_inf: Optional[np.ndarray] = None
# fit monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
self.input_data = None
def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
"""
Perform the fit and evaluate uncertainties with the test set.
Args:
X: The input.
y: The target.
fit_params: If it contains X_test and y_test, they are used to validate the model.
Returns: The object itself.
"""
if 'X_test' in fit_params and 'y_test' in fit_params:
X_test = fit_params['X_test']
y_test = fit_params['y_test']
else:
X_test = X
y_test = y
# model parameter sizes
self.Nx: int = int(X.shape[1])
self.Ny: int = int(y.shape[1])
# initial parameter values
A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
b0: np.ndarray = np.zeros(self.Ny)
Aeps: np.ndarray = np.zeros(self.Nx)
u0: np.ndarray = np.zeros(self.Ny)
x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
# reset loss monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
"""
Calculate the loss function value for a given parameter set `x`.
Args:
x: The parameters as a flat array.
X: The independent-variable dataset.
Y: The dependent-variable dataset.
Returns: The loss.
"""
# diag( (in @ x - out) @ (in @ x - out)^T )
A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
log_unc = anp.matmul(X, A_eps) + b_eps
#log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
iunc2 = anp.exp(-2*log_unc)
L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
weights2 = (anp.sum(anp.square(A.ravel()))
#+ anp.sum(anp.square(b.ravel()))
#+ anp.sum(anp.square(A_eps.ravel()))
#+ anp.sum(anp.square(b_eps.ravel()))
)
return L + self.l/2 * weights2
def loss_history(x: np.ndarray) -> float:
"""
Calculate the loss function and keep a history of it in training and testing.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
l_test = loss(x, X_test, y_test)
self.loss_train += [l_train]
self.loss_test += [l_test]
return l_train
def loss_train(x: np.ndarray) -> float:
"""
Calculate the loss function for the training dataset.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
return l_train
grad_loss = grad(loss_train)
sc_op = fmin_l_bfgs_b(loss_history,
x0,
grad_loss,
disp=True,
factr=1e7,
maxiter=100,
iprint=0)
# Inference
self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
"""
Predict y from X.
Args:
X: Input dataset.
Returns: Predicted Y and, if return_std is True, also its uncertainty.
"""
# result
y = (X @ self.A_inf + self.b_inf)
if not return_std:
return y
# flat uncertainty
y_unc = self.u_inf[0,:]
# input-dependent uncertainty
y_eps = np.exp(X @ self.A_eps + y_unc)
return y, y_eps
def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray):
estimator = clone(estimator)
estimator.fit(X, y)
@@ -436,7 +439,7 @@ def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray):
class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
def __init__(self, estimator, *, n_jobs=8):
def __init__(self, estimator, *, n_jobs=-1):
self.estimator = estimator
self.n_jobs = n_jobs
@@ -458,14 +461,12 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
"multi-output regression but has only one."
)
print(f"Fitting multiple regressors with n_jobs={self.n_jobs}")
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(_fit_estimator)(
self.estimator, X, y[:, i]
)
for i in range(y.shape[1])
)
print("End of fit")
return self
@@ -480,7 +481,6 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
Multi-output targets predicted across multiple predictors.
Note: Separate models are generated for each predictor.
"""
print("Inferring ...")
y = Parallel(n_jobs=self.n_jobs)(
delayed(e.predict)(X, return_std) for e in self.estimators_
#delayed(e.predict)(X) for e in self.estimators_
Loading