diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index ff640e142cfdef3ebded288f58fb15d62da7c8f7..01890d1829b4abb67f0481f5a42fc2c9afe8d61f 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -3,12 +3,10 @@ from __future__ import annotations import joblib import numpy as np -from autograd import numpy as anp -from autograd import grad from scipy.signal import fftconvolve from scipy.signal import find_peaks_cwt from scipy.optimize import fmin_l_bfgs_b -from sklearn.decomposition import PCA +from sklearn.decomposition import PCA, IncrementalPCA from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.base import TransformerMixin, BaseEstimator from sklearn.base import RegressorMixin @@ -23,10 +21,170 @@ from sklearn.model_selection import train_test_split from sklearn.base import clone, MetaEstimatorMixin from joblib import Parallel, delayed -import matplotlib.pyplot as plt - from typing import Any, Dict, List, Optional, Union, Tuple +# previous method with a single matrix, but using gradient descent +# ARDRegression to be used instead +# advantages: +# * parallelization +# * using evidence maximization as an iterative method +# * automatic relevance determination to reduce overtraining +# +#from autograd import numpy as anp +#from autograd import grad +#class FitModel(RegressorMixin, BaseEstimator): +# """ +# Linear regression model with uncertainties. +# +# Args: +# l: Regularization coefficient. +# """ +# def __init__(self, l: float=1e-6): +# self.l = l +# +# # model parameter sizes +# self.Nx: int = 0 +# self.Ny: int = 0 +# +# # fit result +# self.A_inf: Optional[np.ndarray] = None +# self.b_inf: Optional[np.ndarray] = None +# self.u_inf: Optional[np.ndarray] = None +# +# # fit monitoring +# self.loss_train: List[float] = list() +# self.loss_test: List[float] = list() +# +# self.input_data = None +# +# def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin: +# """ +# Perform the fit and evaluate uncertainties with the test set. +# +# Args: +# X: The input. +# y: The target. +# fit_params: If it contains X_test and y_test, they are used to validate the model. +# +# Returns: The object itself. +# """ +# if 'X_test' in fit_params and 'y_test' in fit_params: +# X_test = fit_params['X_test'] +# y_test = fit_params['y_test'] +# else: +# X_test = X +# y_test = y +# +# # model parameter sizes +# self.Nx: int = int(X.shape[1]) +# self.Ny: int = int(y.shape[1]) +# +# # initial parameter values +# A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) +# b0: np.ndarray = np.zeros(self.Ny) +# Aeps: np.ndarray = np.zeros(self.Nx) +# u0: np.ndarray = np.zeros(self.Ny) +# x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) +# +# # reset loss monitoring +# self.loss_train: List[float] = list() +# self.loss_test: List[float] = list() +# +# def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: +# """ +# Calculate the loss function value for a given parameter set `x`. +# +# Args: +# x: The parameters as a flat array. +# X: The independent-variable dataset. +# Y: The dependent-variable dataset. +# +# Returns: The loss. +# """ +# # diag( (in @ x - out) @ (in @ x - out)^T ) +# A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) +# b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) +# +# b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) +# A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) +# log_unc = anp.matmul(X, A_eps) + b_eps +# +# #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) +# iunc2 = anp.exp(-2*log_unc) +# +# L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) +# weights2 = (anp.sum(anp.square(A.ravel())) +# #+ anp.sum(anp.square(b.ravel())) +# #+ anp.sum(anp.square(A_eps.ravel())) +# #+ anp.sum(anp.square(b_eps.ravel())) +# ) +# return L + self.l/2 * weights2 +# +# def loss_history(x: np.ndarray) -> float: +# """ +# Calculate the loss function and keep a history of it in training and testing. +# +# Args: +# x: Parameters flattened out. +# +# Returns: The loss value. +# """ +# l_train = loss(x, X, y) +# l_test = loss(x, X_test, y_test) +# +# self.loss_train += [l_train] +# self.loss_test += [l_test] +# return l_train +# +# def loss_train(x: np.ndarray) -> float: +# """ +# Calculate the loss function for the training dataset. +# +# Args: +# x: Parameters flattened out. +# +# Returns: The loss value. +# """ +# l_train = loss(x, X, y) +# return l_train +# +# grad_loss = grad(loss_train) +# sc_op = fmin_l_bfgs_b(loss_history, +# x0, +# grad_loss, +# disp=True, +# factr=1e7, +# maxiter=100, +# iprint=0) +# +# # Inference +# self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) +# self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) +# self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp +# self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) +# +# def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: +# """ +# Predict y from X. +# +# Args: +# X: Input dataset. +# +# Returns: Predicted Y and, if return_std is True, also its uncertainty. +# """ +# +# # result +# y = (X @ self.A_inf + self.b_inf) +# if not return_std: +# return y +# # flat uncertainty +# y_unc = self.u_inf[0,:] +# # input-dependent uncertainty +# y_eps = np.exp(X @ self.A_eps + y_unc) +# return y, y_eps + + + def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray: """Returns list of train IDs common to sets a, b and c.""" unique_ids = list(set(a).intersection(b).intersection(c)) @@ -71,7 +229,6 @@ class HighResolutionSmoother(TransformerMixin, BaseEstimator): Returns: The object itself. """ - print("Storing high resolution energy") self.energy = np.copy(fit_params["energy"]) if len(self.energy.shape) == 2: self.energy = self.energy[0,:] @@ -86,7 +243,6 @@ class HighResolutionSmoother(TransformerMixin, BaseEstimator): Returns: Smoothened out spectrum. """ - print("Smoothing high-resolution spectrum") if self.high_res_sigma <= 0: return X # use a default energy axis is none is given @@ -198,7 +354,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features). """ - print("Selecting area close to the peak") if self.tof_start is None: raise NotImplementedError("The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.") items = [X[k] for k in self.channels] @@ -244,7 +399,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): Returns: The object itself. """ - print("Estimating peak position") self.tof_start = self.estimate_prompt_peak(X) return self @@ -259,6 +413,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): """ sum_low_res = - np.mean(sum(list(X.values())), axis=0) peak_idx = self.estimate_prompt_peak(X) + import matplotlib.pyplot as plt fig = plt.figure(figsize=(8, 16)) ax = plt.gca() ax.plot(np.arange(peak_idx-100, peak_idx+300), @@ -277,158 +432,6 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator): plt.savefig(filename) plt.close(fig) -class FitModel(RegressorMixin, BaseEstimator): - """ - Linear regression model with uncertainties. - - Args: - l: Regularization coefficient. - """ - def __init__(self, l: float=1e-6): - self.l = l - - # model parameter sizes - self.Nx: int = 0 - self.Ny: int = 0 - - # fit result - self.A_inf: Optional[np.ndarray] = None - self.b_inf: Optional[np.ndarray] = None - self.u_inf: Optional[np.ndarray] = None - - # fit monitoring - self.loss_train: List[float] = list() - self.loss_test: List[float] = list() - - self.input_data = None - - def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin: - """ - Perform the fit and evaluate uncertainties with the test set. - - Args: - X: The input. - y: The target. - fit_params: If it contains X_test and y_test, they are used to validate the model. - - Returns: The object itself. - """ - if 'X_test' in fit_params and 'y_test' in fit_params: - X_test = fit_params['X_test'] - y_test = fit_params['y_test'] - else: - X_test = X - y_test = y - - # model parameter sizes - self.Nx: int = int(X.shape[1]) - self.Ny: int = int(y.shape[1]) - - # initial parameter values - A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) - b0: np.ndarray = np.zeros(self.Ny) - Aeps: np.ndarray = np.zeros(self.Nx) - u0: np.ndarray = np.zeros(self.Ny) - x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) - - # reset loss monitoring - self.loss_train: List[float] = list() - self.loss_test: List[float] = list() - - def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: - """ - Calculate the loss function value for a given parameter set `x`. - - Args: - x: The parameters as a flat array. - X: The independent-variable dataset. - Y: The dependent-variable dataset. - - Returns: The loss. - """ - # diag( (in @ x - out) @ (in @ x - out)^T ) - A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) - b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) - - b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) - A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) - log_unc = anp.matmul(X, A_eps) + b_eps - - #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) - iunc2 = anp.exp(-2*log_unc) - - L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) - weights2 = (anp.sum(anp.square(A.ravel())) - #+ anp.sum(anp.square(b.ravel())) - #+ anp.sum(anp.square(A_eps.ravel())) - #+ anp.sum(anp.square(b_eps.ravel())) - ) - return L + self.l/2 * weights2 - - def loss_history(x: np.ndarray) -> float: - """ - Calculate the loss function and keep a history of it in training and testing. - - Args: - x: Parameters flattened out. - - Returns: The loss value. - """ - l_train = loss(x, X, y) - l_test = loss(x, X_test, y_test) - - self.loss_train += [l_train] - self.loss_test += [l_test] - return l_train - - def loss_train(x: np.ndarray) -> float: - """ - Calculate the loss function for the training dataset. - - Args: - x: Parameters flattened out. - - Returns: The loss value. - """ - l_train = loss(x, X, y) - return l_train - - grad_loss = grad(loss_train) - sc_op = fmin_l_bfgs_b(loss_history, - x0, - grad_loss, - disp=True, - factr=1e7, - maxiter=100, - iprint=0) - - # Inference - self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) - self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) - self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp - self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) - - def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: - """ - Predict y from X. - - Args: - X: Input dataset. - - Returns: Predicted Y and, if return_std is True, also its uncertainty. - """ - - # result - y = (X @ self.A_inf + self.b_inf) - if not return_std: - return y - # flat uncertainty - y_unc = self.u_inf[0,:] - # input-dependent uncertainty - y_eps = np.exp(X @ self.A_eps + y_unc) - return y, y_eps - - def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray): estimator = clone(estimator) estimator.fit(X, y) @@ -436,7 +439,7 @@ def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray): class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator): - def __init__(self, estimator, *, n_jobs=8): + def __init__(self, estimator, *, n_jobs=-1): self.estimator = estimator self.n_jobs = n_jobs @@ -458,14 +461,12 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator): "multi-output regression but has only one." ) - print(f"Fitting multiple regressors with n_jobs={self.n_jobs}") self.estimators_ = Parallel(n_jobs=self.n_jobs)( delayed(_fit_estimator)( self.estimator, X, y[:, i] ) for i in range(y.shape[1]) ) - print("End of fit") return self @@ -480,7 +481,6 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator): Multi-output targets predicted across multiple predictors. Note: Separate models are generated for each predictor. """ - print("Inferring ...") y = Parallel(n_jobs=self.n_jobs)( delayed(e.predict)(X, return_std) for e in self.estimators_ #delayed(e.predict)(X) for e in self.estimators_ diff --git a/pyproject.toml b/pyproject.toml index c56a6230864e1bbcca2c91bae3ae3813d3b5038d..e9a6551854e69245c99be39249e5f51b9e599774 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,8 +27,8 @@ dynamic = ["version", "readme"] dependencies = [ "numpy>=1.21", "scipy>=1.6", - "scikit-learn", - "autograd", + "scikit-learn>=1.0.2", + #"autograd", "h5py" ]