Skip to content
Snippets Groups Projects
Commit df5e42c9 authored by Danilo Enoque Ferreira de Lima's avatar Danilo Enoque Ferreira de Lima
Browse files

Merge branch 'nystroem' into 'main'

Added kernel approximation with the Nystroem sub-space projection method as an alternative

See merge request !3
parents f3d98448 9f64ef68
No related branches found
No related tags found
1 merge request!3Added kernel approximation with the Nystroem sub-space projection method as an alternative
......@@ -8,16 +8,24 @@ from autograd import grad
from scipy.signal import fftconvolve
from scipy.signal import find_peaks_cwt
from scipy.optimize import fmin_l_bfgs_b
from sklearn.decomposition import KernelPCA, PCA
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.base import RegressorMixin
from sklearn.kernel_approximation import Nystroem
from sklearn.linear_model import ARDRegression
#from sklearn.svm import LinearSVR
#from sklearn.gaussian_process import GaussianProcessRegressor
#from sklearn.gaussian_process.kernels import DotProduct, WhiteKernel
from itertools import product
from sklearn.model_selection import train_test_split
from sklearn.base import clone, MetaEstimatorMixin
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union, Tuple
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
"""Returns list of train IDs common to sets a, b and c."""
......@@ -263,8 +271,13 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
class FitModel(RegressorMixin, BaseEstimator):
"""
Linear regression model with uncertainties.
Args:
l: Regularization coefficient.
"""
def __init__(self):
def __init__(self, l: float=1e-6):
self.l = l
# model parameter sizes
self.Nx: int = 0
self.Ny: int = 0
......@@ -304,8 +317,8 @@ class FitModel(RegressorMixin, BaseEstimator):
# initial parameter values
A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
Aeps: np.ndarray = np.zeros(self.Nx)
b0: np.ndarray = np.zeros(self.Ny)
Aeps: np.ndarray = np.zeros(self.Nx)
u0: np.ndarray = np.zeros(self.Ny)
x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
......@@ -330,12 +343,18 @@ class FitModel(RegressorMixin, BaseEstimator):
b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
log_unc = X @ A_eps + b_eps
log_unc = anp.matmul(X, A_eps) + b_eps
#log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
iunc2 = anp.exp(-2*log_unc)
return anp.mean( (0.5*((X@ A + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) # Put RELU on (XX@x) and introduce new matrix W
L = anp.mean( (0.5*((anp.matmul(X, A) + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 )
weights2 = (anp.sum(anp.square(A.ravel()))
#+ anp.sum(anp.square(b.ravel()))
#+ anp.sum(anp.square(A_eps.ravel()))
#+ anp.sum(anp.square(b_eps.ravel()))
)
return L + self.l/2 * weights2
def loss_history(x: np.ndarray) -> float:
"""
......@@ -371,7 +390,7 @@ class FitModel(RegressorMixin, BaseEstimator):
grad_loss,
disp=True,
factr=1e7,
maxiter=1000,
maxiter=100,
iprint=0)
# Inference
......@@ -380,60 +399,85 @@ class FitModel(RegressorMixin, BaseEstimator):
self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
def as_dict(self) -> Dict[str, Any]:
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
"""
Export relevant variables to rebuild this object from a simple dictionary.
Predict y from X.
Args:
X: Input dataset.
Returns: Dictionary with all relevant variables.
Returns: Predicted Y and, if return_std is True, also its uncertainty.
"""
return dict(
A_inf=self.A_inf,
b_inf=self.b_inf,
u_inf=self.u_inf,
A_eps=self.A_eps,
loss_train=self.loss_train,
loss_test=self.loss_test,
Nx=self.Nx,
Ny=self.Ny)
def from_dict(self, in_dict: Dict[str, Any]):
"""
Rebuild this object from a dictionary.
Args:
in_dict: The input dictionary with relevant variables.
# result
y = (X @ self.A_inf + self.b_inf)
if not return_std:
return y
# flat uncertainty
y_unc = self.u_inf[0,:]
# input-dependent uncertainty
y_eps = np.exp(X @ self.A_eps + y_unc)
return y, y_eps
"""
self.A_inf = in_dict["A_inf"]
self.b_inf = in_dict["b_inf"]
self.u_inf = in_dict["u_inf"]
self.A_eps = in_dict["A_eps"]
self.loss_train = in_dict["loss_train"]
self.loss_test = in_dict["loss_test"]
self.Nx = in_dict["Nx"]
self.Ny = in_dict["Ny"]
def predict(self, X: np.ndarray) -> Dict[str, np.ndarray]:
"""
Predict y from X.
def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray):
estimator = clone(estimator)
estimator.fit(X, y)
return estimator
class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
def __init__(self, estimator, *, n_jobs=None):
self.estimator = estimator
self.n_jobs = n_jobs
def fit(self, X: np.ndarray, y: np.ndarray):
"""Fit the model to data, separately for each output variable.
Args:
X: Input dataset.
X: {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
y: {array-like, sparse matrix} of shape (n_samples, n_outputs)
Multi-output targets. An indicator matrix turns on multilabel
estimation.
Returns: self.
"""
if y.ndim == 1:
raise ValueError(
"y must have at least two dimensions for "
"multi-output regression but has only one."
)
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(_fit_estimator)(
self.estimator, X, y[:, i]
)
for i in range(y.shape[1])
)
Returns: Predicted Y.
"""
return self
result = dict()
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Predict multi-output variable using model for each target variable.
# result
result["Y"] = (X @ self.A_inf + self.b_inf)
# flat uncertainty
result["Y_unc"] = self.u_inf[0,:]
# input-dependent uncertainty
result["Y_eps"] = np.exp(X @ self.A_eps + result["Y_unc"])
return result
Args:
X: {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
Returns: {array-like, sparse matrix} of shape (n_samples, n_outputs)
Multi-output targets predicted across multiple predictors.
Note: Separate models are generated for each predictor.
"""
y = Parallel(n_jobs=self.n_jobs)(
delayed(e.predict)(X, return_std) for e in self.estimators_
#delayed(e.predict)(X) for e in self.estimators_
)
if return_std:
y, unc = zip(*y)
return np.asarray(y).T, np.asarray(unc).T
return np.asarray(y).T
class Model(TransformerMixin, BaseEstimator):
"""
......@@ -451,7 +495,7 @@ class Model(TransformerMixin, BaseEstimator):
Set to None to perform no selection.
validation_size: Fraction (number between 0 and 1) of the data to take for
validation and systematic uncertainty estimate.
n_pca_nonlinear: Number of nonlinear PCA components added at the preprocessing stage
n_nonlinear_kernel: Number of nonlinear kernel components added at the preprocessing stage
to obtain nonlinearities as an input and improve the prediction.
"""
......@@ -464,31 +508,26 @@ class Model(TransformerMixin, BaseEstimator):
tof_start: Optional[int]=None,
delta_tof: Optional[int]=300,
validation_size: float=0.05,
n_pca_nonlinear: int=0):
n_nonlinear_kernel: int=10000):
# models
if n_pca_nonlinear <= 0:
x_pca_model = PCA(n_pca_lr, whiten=True)
else:
x_pca_model = FeatureUnion([
('linear', PCA(n_pca_lr, whiten=True)),
('nonlinear', Pipeline([('prep', PCA(n_pca_lr, whiten=True)),
('kpca', KernelPCA(n_pca_nonlinear,
kernel='rbf',
gamma=0.1,
n_jobs=-1)),
])),
])
self.x_model = Pipeline([
('select', SelectRelevantLowResolution(channels, tof_start, delta_tof)),
('pca', x_pca_model),
('unc', UncertaintyHolder()),
])
x_model_steps = list()
x_model_steps += [('select', SelectRelevantLowResolution(channels, tof_start, delta_tof))]
if n_nonlinear_kernel > 0:
x_model_steps += [('fex', Pipeline([('prepca', PCA(n_pca_lr, whiten=True)),
('nystroem', Nystroem(n_components=n_nonlinear_kernel, kernel='rbf', gamma=None, n_jobs=8)),
]))]
x_model_steps += [
('pca', PCA(n_pca_lr, whiten=True)),
('unc', UncertaintyHolder()),
]
self.x_model = Pipeline(x_model_steps)
self.y_model = Pipeline([
('smoothen', HighResolutionSmoother(high_res_sigma)),
('pca', PCA(n_pca_hr, whiten=False)),
('pca', PCA(n_pca_hr, whiten=True)),
('unc', UncertaintyHolder()),
])
self.fit_model = FitModel()
#self.fit_model = FitModel()
self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=30, tol=1e-4, verbose=True))
# size of the test subset
self.validation_size = validation_size
......@@ -532,6 +571,7 @@ class Model(TransformerMixin, BaseEstimator):
"""
x_t = self.x_model.fit_transform(low_res_data)
y_t = self.y_model.fit_transform(high_res_data, smoothen__energy=high_res_photon_energy)
#self.fit_model.set_params(fex__gamma=1.0/float(x_t.shape[0]))
self.fit_model.fit(x_t, y_t)
# calculate the effect of the PCA
......@@ -542,12 +582,11 @@ class Model(TransformerMixin, BaseEstimator):
self.y_model['unc'].set_uncertainty(high_pca_unc)
low_res = self.x_model['select'].transform(low_res_data)
low_pca = self.x_model['pca'].transform(low_res)
if isinstance(self.x_model['pca'], FeatureUnion):
n = self.x_model['pca'].transformer_list[0][1].n_components
low_pca_rec = self.x_model['pca'].transformer_list[0][1].inverse_transform(low_pca[:, :n])
else:
low_pca_rec = self.x_model['pca'].inverse_transform(low_pca)
pca_model = self.x_model['pca']
if 'fex' in self.x_model.named_steps:
pca_model = self.x_model['fex'].named_steps['prepca']
low_pca = pca_model.transform(low_res)
low_pca_rec = pca_model.inverse_transform(low_pca)
low_pca_unc = np.mean(np.sqrt(np.mean((low_res - low_pca_rec)**2, axis=1, keepdims=True)), axis=0, keepdims=True)
self.x_model['unc'].set_uncertainty(low_pca_unc)
......@@ -564,12 +603,11 @@ class Model(TransformerMixin, BaseEstimator):
Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model.
"""
low_res = self.x_model['select'].transform(low_res_data)
low_pca = self.x_model['pca'].transform(low_res)
if isinstance(self.x_model['pca'], FeatureUnion):
n = self.x_model['pca'].transformer_list[0][1].n_components
low_pca_rec = self.x_model['pca'].transformer_list[0][1].inverse_transform(low_pca[:, :n])
else:
low_pca_rec = self.x_model['pca'].inverse_transform(low_pca)
pca_model = self.x_model['pca']
if 'fex' in self.x_model.named_steps:
pca_model = self.x_model['fex'].named_steps['prepca']
low_pca = pca_model.transform(low_res)
low_pca_rec = pca_model.inverse_transform(low_pca)
low_pca_unc = self.x_model['unc'].uncertainty()
#fig = plt.figure(figsize=(8, 16))
......@@ -604,10 +642,12 @@ class Model(TransformerMixin, BaseEstimator):
a (1, energy channel) array for the PCA syst. uncertainty in key "pca".
"""
low_pca = self.x_model.transform(low_res_data)
high_pca = self.fit_model.predict(low_pca)
n_trains = high_pca["Y"].shape[0]
pca_y = np.concatenate((high_pca["Y"],
high_pca["Y"] + high_pca["Y_eps"]),
high_pca, high_pca_unc = self.fit_model.predict(low_pca, return_std=True)
#high_pca = self.fit_model.predict(low_pca)
#high_pca_unc = 0
n_trains = high_pca.shape[0]
pca_y = np.concatenate((high_pca,
high_pca + high_pca_unc),
axis=0)
high_res_predicted = self.y_model["pca"].inverse_transform(pca_y)
expected = high_res_predicted[:n_trains, :]
......
#!/usr/bin/env python
import sys
sys.path.append('.')
sys.path.append('..')
import numpy as np
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment