Skip to content
Snippets Groups Projects

Improve fit

Merged Danilo Enoque Ferreira de Lima requested to merge parallelization into main
2 files
+ 177
176
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 176
175
@@ -13,6 +13,7 @@ from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.base import RegressorMixin
from sklearn.kernel_approximation import Nystroem
from sklearn.linear_model import ARDRegression
from sklearn.preprocessing import PolynomialFeatures
#from sklearn.svm import LinearSVR
#from sklearn.gaussian_process import GaussianProcessRegressor
#from sklearn.gaussian_process.kernels import DotProduct, WhiteKernel
@@ -34,160 +35,156 @@ from typing import Any, Dict, List, Optional, Union, Tuple
# * using evidence maximization as an iterative method
# * automatic relevance determination to reduce overtraining
#
#from autograd import numpy as anp
#from autograd import grad
#class FitModel(RegressorMixin, BaseEstimator):
# """
# Linear regression model with uncertainties.
#
# Args:
# l: Regularization coefficient.
# """
# def __init__(self, l: float=1e-6):
# self.l = l
#
# # model parameter sizes
# self.Nx: int = 0
# self.Ny: int = 0
#
# # fit result
# self.A_inf: Optional[np.ndarray] = None
# self.b_inf: Optional[np.ndarray] = None
# self.u_inf: Optional[np.ndarray] = None
#
# # fit monitoring
# self.loss_train: List[float] = list()
# self.loss_test: List[float] = list()
#
# self.input_data = None
#
# def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
# """
# Perform the fit and evaluate uncertainties with the test set.
#
# Args:
# X: The input.
# y: The target.
# fit_params: If it contains X_test and y_test, they are used to validate the model.
#
# Returns: The object itself.
# """
# if 'X_test' in fit_params and 'y_test' in fit_params:
# X_test = fit_params['X_test']
# y_test = fit_params['y_test']
# else:
# X_test = X
# y_test = y
#
# # model parameter sizes
# self.Nx: int = int(X.shape[1])
# self.Ny: int = int(y.shape[1])
#
# # initial parameter values
# A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
# b0: np.ndarray = np.zeros(self.Ny)
# Aeps: np.ndarray = np.zeros(self.Nx)
# u0: np.ndarray = np.zeros(self.Ny)
# x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
#
# # reset loss monitoring
# self.loss_train: List[float] = list()
# self.loss_test: List[float] = list()
#
# def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
# """
# Calculate the loss function value for a given parameter set `x`.
#
# Args:
# x: The parameters as a flat array.
# X: The independent-variable dataset.
# Y: The dependent-variable dataset.
#
# Returns: The loss.
# """
# # diag( (in @ x - out) @ (in @ x - out)^T )
# A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
# b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
#
# b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
# A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
# log_unc = anp.matmul(X, A_eps) + b_eps
#
# #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
# iunc2 = anp.exp(-2*log_unc)
#
# L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
# weights2 = (anp.sum(anp.square(A.ravel()))
# #+ anp.sum(anp.square(b.ravel()))
# #+ anp.sum(anp.square(A_eps.ravel()))
# #+ anp.sum(anp.square(b_eps.ravel()))
# )
# return L + self.l/2 * weights2
#
# def loss_history(x: np.ndarray) -> float:
# """
# Calculate the loss function and keep a history of it in training and testing.
#
# Args:
# x: Parameters flattened out.
#
# Returns: The loss value.
# """
# l_train = loss(x, X, y)
# l_test = loss(x, X_test, y_test)
#
# self.loss_train += [l_train]
# self.loss_test += [l_test]
# return l_train
#
# def loss_train(x: np.ndarray) -> float:
# """
# Calculate the loss function for the training dataset.
#
# Args:
# x: Parameters flattened out.
#
# Returns: The loss value.
# """
# l_train = loss(x, X, y)
# return l_train
#
# grad_loss = grad(loss_train)
# sc_op = fmin_l_bfgs_b(loss_history,
# x0,
# grad_loss,
# disp=True,
# #factr=1e7,
# factr=10,
# maxiter=100,
# iprint=0)
#
# # Inference
# self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
# self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
# self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
# self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
#
# def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
# """
# Predict y from X.
#
# Args:
# X: Input dataset.
#
# Returns: Predicted Y and, if return_std is True, also its uncertainty.
# """
#
# # result
# y = (X @ self.A_inf + self.b_inf)
# if not return_std:
# return y
# # flat uncertainty
# y_unc = self.u_inf[0,:]
# # input-dependent uncertainty
# y_eps = np.exp(X @ self.A_eps + y_unc)
# return y, y_eps
#
from autograd import numpy as anp
from autograd import grad
class FitModel(RegressorMixin, BaseEstimator):
"""
Linear regression model with uncertainties.
Args:
l: Regularization coefficient.
"""
def __init__(self, l: float=1e-6):
self.l = l
# model parameter sizes
self.Nx: int = 0
self.Ny: int = 0
# fit result
self.A_inf: Optional[np.ndarray] = None
self.b_inf: Optional[np.ndarray] = None
self.u_inf: Optional[np.ndarray] = None
# fit monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
self.input_data = None
def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
"""
Perform the fit and evaluate uncertainties with the test set.
Args:
X: The input.
y: The target.
fit_params: If it contains X_test and y_test, they are used to validate the model.
Returns: The object itself.
"""
if 'X_test' in fit_params and 'y_test' in fit_params:
X_test = fit_params['X_test']
y_test = fit_params['y_test']
else:
X_test = X
y_test = y
# model parameter sizes
self.Nx: int = int(X.shape[1])
self.Ny: int = int(y.shape[1])
# initial parameter values
A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
b0: np.ndarray = np.zeros(self.Ny)
Aeps: np.ndarray = np.zeros(self.Nx)
u0: np.ndarray = np.zeros(self.Ny)
x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
# reset loss monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
"""
Calculate the loss function value for a given parameter set `x`.
Args:
x: The parameters as a flat array.
X: The independent-variable dataset.
Y: The dependent-variable dataset.
Returns: The loss.
"""
# diag( (in @ x - out) @ (in @ x - out)^T )
A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
log_unc = anp.matmul(X, A_eps) + b_eps
#log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
iunc2 = anp.exp(-2*log_unc)
L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
#weights2 = anp.sum(anp.square(A.ravel()))
return L #+ self.l/2 * weights2
def loss_history(x: np.ndarray) -> float:
"""
Calculate the loss function and keep a history of it in training and testing.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
l_test = loss(x, X_test, y_test)
self.loss_train += [l_train]
self.loss_test += [l_test]
return l_train
def loss_train(x: np.ndarray) -> float:
"""
Calculate the loss function for the training dataset.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
return l_train
grad_loss = grad(loss_train)
sc_op = fmin_l_bfgs_b(loss_history,
x0,
grad_loss,
disp=True,
#factr=1e7,
factr=10,
maxiter=100,
iprint=0)
# Inference
self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
"""
Predict y from X.
Args:
X: Input dataset.
Returns: Predicted Y and, if return_std is True, also its uncertainty.
"""
# result
y = (X @ self.A_inf + self.b_inf)
if not return_std:
return y
# flat uncertainty
y_unc = self.u_inf[0,:]
# input-dependent uncertainty
y_eps = np.exp(X @ self.A_eps + y_unc)
return y, y_eps
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
@@ -617,10 +614,14 @@ class Model(TransformerMixin, BaseEstimator):
('pca', PCA(n_pca_hr, whiten=True)),
('unc', UncertaintyHolder()),
])
#self.fit_model = FitModel()
self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-5, verbose=True))
self.fit_model = Pipeline([
#('fit', MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-6, verbose=True), n_jobs=10)),
('fit', FitModel()),
])
self.channel_mean = {ch: np.nan for ch in channels}
self.channel_pca = {ch: PCA(n_pca_lr, whiten=True)
for ch in channels}
#self.channel_relevance = {ch: np.nan for ch in channels}
@@ -705,23 +706,15 @@ class Model(TransformerMixin, BaseEstimator):
self.x_model['unc'].set_uncertainty(low_pca_unc)
# for consistency check
self.x_pca_mean = np.mean(low_pca, axis=0, keepdims=True)
self.x_pca_std = np.std(low_pca, axis=0, keepdims=True)
self.x_pca_mean = np.mean(low_res - low_pca_rec, axis=0, keepdims=True)
self.x_pca_std = np.std(low_res - low_pca_rec, axis=0, keepdims=True)
# for consistency check per channel
#selection_model = self.x_model['select']
#for channel in self.get_channels():
# self.channel_mean[channel] = np.mean(low_res_data[channel], axis=0, keepdims=True)
# print(f"Calculate PCA relevance on {channel}")
# # freeze input data in one channel only
# low_res_data_frozen = {ch: low_res_data[ch] if ch != channel
# else np.repeat(self.channel_mean[channel], low_res_data[ch].shape[0], axis=0)
# for ch in self.get_channels()}
# low_res = selection_model.transform(low_res_data_frozen)
# low_pca = pca_model.fit_transform(low_res)
# low_pca_rec = pca_model.inverse_transform(low_pca)
# low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True)
# self.channel_relevance[channel] = low_pca_unc
selection_model = self.x_model['select']
low_res_selected = selection_model.transform(low_res_data, keep_dictionary_structure=True)
for channel in self.get_channels():
print(f"Calculate PCA on {channel}")
low_pca = self.channel_pca[channel].fit_transform(low_res_selected[channel])
print("End of fit.")
return high_res
@@ -766,8 +759,12 @@ class Model(TransformerMixin, BaseEstimator):
channels = list(low_res_data.keys())
# check if each channel is close to the mean
low_res_selected = selection_model.transform(low_res_data, keep_dictionary_structure=True)
deviation = {ch: (low_res_selected[ch] - selection_model.mean[ch])/selection_model.std[ch]
# decorrelate it
deviation = {ch: self.channel_pca[ch].transform(low_res_selected[ch])
for ch in channels}
# just whiten it
#deviation = {ch: (low_res_selected[ch] - selection_model.mean[ch])/selection_model.std[ch]
# for ch in channels}
ndof = {ch: float(deviation[ch].shape[1] - 1)
for ch in channels}
chi2 = {ch: np.sum(deviation[ch]**2, axis=1, keepdims=True)
@@ -809,15 +806,15 @@ class Model(TransformerMixin, BaseEstimator):
if 'fex' in self.x_model.named_steps:
pca_model = self.x_model['fex'].named_steps['prepca']
low_pca = pca_model.transform(low_res)
low_pca_rec = pca_model.inverse_transform(low_pca)
deviation = (low_pca - self.x_pca_mean)/self.x_pca_std
deviation = (low_pca_rec - low_res - self.x_pca_mean)/self.x_pca_std
ndof = float(deviation.shape[1] - 1)
chi2 = np.sum(deviation**2, axis=1, keepdims=True)
chi2_mu = scipy.stats.chi2.mean(ndof)
chi2_sigma = scipy.stats.chi2.std(ndof)
return (chi2 - chi2_mu)/chi2_sigma
#low_pca_rec = pca_model.inverse_transform(low_pca)
#low_pca_unc = self.x_model['unc'].uncertainty()
#fig = plt.figure(figsize=(8, 16))
@@ -879,6 +876,7 @@ class Model(TransformerMixin, BaseEstimator):
DataHolder(self.channel_mean),
DataHolder(self.x_pca_mean),
DataHolder(self.x_pca_std),
self.channel_pca,
#DataHolder(self.channel_relevance)
], filename, compress='zlib')
@@ -892,7 +890,9 @@ class Model(TransformerMixin, BaseEstimator):
Returns: A new model object.
"""
x_model, y_model, fit_model, channel_mean, x_pca_mean, x_pca_std = joblib.load(filename)
(x_model, y_model, fit_model,
channel_mean, x_pca_mean, x_pca_std,
channel_pca) = joblib.load(filename)
obj = Model()
obj.x_model = x_model
obj.y_model = y_model
@@ -900,6 +900,7 @@ class Model(TransformerMixin, BaseEstimator):
obj.channel_mean = channel_mean.get_data()
obj.x_pca_mean = x_pca_mean.get_data()
obj.x_pca_std = x_pca_std.get_data()
obj.channel_pca = channel_pca
#obj.channel_relevance = channel_relevance.get_data()
return obj
Loading