Skip to content
Snippets Groups Projects

Includes input energy parameter in the model and adds non-linearities

Merged Danilo Enoque Ferreira de Lima requested to merge with_energy into main
1 file
+ 18
9
Compare changes
  • Side-by-side
  • Inline
+ 50
306
@@ -4,262 +4,23 @@ import joblib
import numpy as np
from scipy.signal import fftconvolve
#from scipy.signal import find_peaks_cwt
from scipy.optimize import fmin_l_bfgs_b
from sklearn.decomposition import PCA
from sklearn.decomposition import IncrementalPCA, PCA
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.base import RegressorMixin
from sklearn.base import OutlierMixin
from sklearn.pipeline import Pipeline
from sklearn.kernel_approximation import Nystroem
#from sklearn.linear_model import ARDRegression
from sklearn.linear_model import BayesianRidge
#from sklearn.covariance import EllipticEnvelope
from sklearn.metrics import accuracy_score
from scipy.stats import gaussian_kde
from functools import reduce
from itertools import product
from time import time_ns
from sklearn.base import clone, MetaEstimatorMixin
from joblib import Parallel, delayed
from copy import deepcopy
from typing import Any, Dict, List, Optional, Union, Tuple
# previous method with a single matrix, but using gradient descent
# ARDRegression to be used instead
# advantages:
# * parallelization
# * using evidence maximization as an iterative method
# * automatic relevance determination to reduce overtraining
#
from autograd import numpy as anp
from autograd import grad
class FitModel(RegressorMixin, BaseEstimator):
"""
Linear regression model with uncertainties.
Args:
"""
def __init__(self):
# model parameter sizes
self.Nx: int = 0
self.Ny: int = 0
# fit result
self.pars: Optional[Dict[str, np.ndarray]] = None
self.structure: Optional[Dict[str, Tuple[int, int]]] = None
from pes_to_spec.bnn import BNNModel
# fit monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
self.nll_train: Optional[np.ndarray] = None
self.nll_train_expected: Optional[np.ndarray] = None
def fit(self, X: np.ndarray, y: np.ndarray, **fit_params) -> RegressorMixin:
"""
Perform the fit and evaluate uncertainties with the test set.
Args:
X: The input.
y: The target.
fit_params: If it contains X_test and y_test, they are used to validate the model.
Returns: The object itself.
"""
if 'X_test' in fit_params and 'y_test' in fit_params:
X_test = fit_params['X_test']
y_test = fit_params['y_test']
else:
X_test = X
y_test = y
# model parameter sizes
self.Nx: int = int(X.shape[1])
self.Ny: int = int(y.shape[1])
# initial parameter values
self.structure = dict(A_inf=(self.Nx, self.Ny),
b_inf=(1, self.Ny),
Ap_inf=(self.Nx, self.Ny),
log_inv_alpha=(1, self.Ny),
log_inv_alpha_prime=(self.Nx, 1),
#log_inv_alpha_prime2=(self.Nx, 1),
#log_inv_tau1=(1, 1),
#log_inv_tau2=(1, 1),
)
# initialize close to the solution
# pinv(X) @ y solves the problem Ax = y
# assume b is zero, since both X and y are mean subtracted after the PCA
# assume a small noise uncertainty in alpha and in tau
init_method = dict(A_inf=lambda shape: np.linalg.pinv(X) @ (y - np.mean(y, axis=0, keepdims=True)),
b_inf=lambda shape: np.mean(y, axis=0, keepdims=True),
Ap_inf=lambda shape: np.zeros(shape),
log_inv_alpha=lambda shape: 1.0*np.ones(shape),
log_inv_alpha_prime=lambda shape: 1.0*np.ones(shape),
#log_inv_tau1=lambda shape: 1.0*np.ones(shape),
#log_inv_tau2=lambda shape: 1.0*np.ones(shape),
)
x0: np.ndarray = FitModel.get_pars_init(self.structure, init_method)
# reset loss monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
"""
Calculate the loss function value for a given parameter set `x`.
Args:
x: The parameters as a flat array.
X: The independent-variable dataset.
Y: The dependent-variable dataset.
Returns: The loss.
"""
p = FitModel.get_pars(x, self.structure)
return anp.mean(self.nll(X, Y, pars=p), axis=0)
def loss_history(x: np.ndarray) -> float:
"""
Calculate the loss function and keep a history of it in training and testing.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
l_test = loss(x, X_test, y_test)
self.loss_train += [l_train]
self.loss_test += [l_test]
return l_train
def loss_train(x: np.ndarray) -> float:
"""
Calculate the loss function for the training dataset.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, X, y)
return l_train
grad_loss = grad(loss_train)
sc_op = fmin_l_bfgs_b(loss_history,
x0,
grad_loss,
disp=True,
factr=1e7,
#factr=10,
maxiter=10000,
iprint=0)
# Inference
self.pars = FitModel.get_pars(sc_op[0], self.structure)
self.nll_train = sc_op[1]
self.nll_train_expected = np.mean(self.nll(X, pars=self.pars), axis=0, keepdims=True)
return self
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
"""
Predict y from X.
Args:
X: Input dataset.
Returns: Predicted Y and, if return_std is True, also its uncertainty.
"""
# result
A = self.pars["A_inf"]
b = self.pars["b_inf"]
X2 = anp.square(X)
Ap = self.pars["Ap_inf"]
y = anp.matmul(X2, Ap) + anp.matmul(X, A) + b
if not return_std:
return y
# input-dependent uncertainty
log_inv_alpha = anp.matmul(X, self.pars["log_inv_alpha_prime"]) + self.pars["log_inv_alpha"]
sqrt_inv_alpha = anp.exp(0.5*log_inv_alpha)
return y, sqrt_inv_alpha
@staticmethod
def get_pars(x: np.ndarray, structure: Dict[str, Tuple[int, int]]) -> Dict[str, np.ndarray]:
pars = dict()
s = 0
for key, value in structure.items():
n = value[0]*value[1]
e = s + n
pars[key] = x[s:e].reshape(value)
s += n
return pars
@staticmethod
def get_pars_size(structure: Dict[str, Tuple[int, int]]) -> int:
size = [value[0]*value[1] for _, value in structure.items()]
return reduce(lambda x, y: x*y, size)
@staticmethod
def get_pars_init(structure: Dict[str, Tuple[int, int]], init_method: Optional[Dict[str, Any]]=dict()) -> int:
init = {key: np.zeros((value[0], value[1])).reshape(-1) if not key in init_method
else init_method[key](value).reshape(-1)
for key, value in structure.items()}
return np.concatenate(list(init.values()), axis=0)
def nll(self, X: np.ndarray, Y: Optional[np.ndarray]=None, pars: Optional[Dict[str, np.ndarray]]=None) -> np.ndarray:
"""
To estimate p(M|X) = int_Y p(M|X,Y)p(Y) dY, we assume p(Y) is Normal(mean(X), std(X)).
p(M|X,Y=Normal(mu(X), var(X))) = 1/2b exp(-(mu(X)-mu(X))/b) = 1/2b.
-log p = log(2) + log(b)
We return -log [p(M|X,Y=mu(X))/p(M|X_train,Y_train)] = -log p(M|X,Y=mu(X)) + log p(M|X_train, Y_traun)
Negative log likelihood L allows one
Args:
X: The input data.
Y: The true result. If None, set it to the expectation.
Returns: negative log likelihood.
"""
if pars is None:
pars = self.pars
A = pars["A_inf"]
b = pars["b_inf"]
X2 = anp.square(X)
Ap = pars["Ap_inf"]
Y_pred = anp.matmul(X2, Ap) + anp.matmul(X, A) + b
log_inv_alpha = anp.matmul(X, pars["log_inv_alpha_prime"]) + pars["log_inv_alpha"]
sqrt_alpha = anp.exp(-0.5*log_inv_alpha)
#log_inv_tau1 = pars["log_inv_tau1"]
#tau1 = anp.exp(-log_inv_tau1)
#log_inv_tau2 = pars["log_inv_tau2"]
#tau2 = anp.exp(-log_inv_tau2)
if Y is None:
Y = self.predict(X)
# likelihood = p(D|x, y, A, b) = 1/sqrt(2 pi sigma^2) exp(-0.5*(Ax + b - y)/sigma^2)
# sigma is modelled as exp(A_e x + b_e) to make the aleatoric uncertainty data-dependent
L = anp.sum((anp.fabs(Y_pred - Y))*sqrt_alpha + 0.5*log_inv_alpha, axis=1)
# prior p(A_inf) p(b_inf) = p(A_inf) cte. (assume all b equally likely)
# A has normal prior with var = 1/tau
# 1/sqrt(2pi)1/sqrt(sigma**2) exp(-0.5 A**2/sigma**2)
# log p = -0.5 A **2/sigma**2 - 0.5 log sigma**2 - 0.5 log 2pi
# - log p = 0.5 A**2/sigma**2 + 0.5 log sigma**2 + 0.5 log 2pi
# 0.5 log sigma**2 = 0.5 log 1/tau
#L_prior = anp.sum(0.5*anp.square(pars["A_inf"].ravel())*tau.ravel() + 0.5*log_inv_tau.ravel())
#L_prior = (anp.sum(0.5*anp.square(A.ravel())*tau1 + 0.5*log_inv_tau1)
# + anp.sum(0.5*anp.square(Ap.ravel())*tau2 + 0.5*log_inv_tau2)
# )
#alpha = 2.0
#beta = 2.0
#L_hyper = anp.sum((alpha - 1)*log_inv_tau1 + beta*tau1
# + (alpha - 1)*log_inv_tau2 + beta*tau2
# )
return L
from typing import Any, Dict, List, Optional, Union, Tuple
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
"""Returns list of train IDs common to sets a, b and c."""
@@ -478,7 +239,10 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
self.mean = dict()
self.std = dict()
def transform(self, X: Dict[str, np.ndarray], keep_dictionary_structure: bool=False, pulse_spacing: Optional[Dict[str, List[int]]]=None) -> np.ndarray:
def transform(self, X: Dict[str, np.ndarray],
keep_dictionary_structure: bool=False,
pulse_spacing: Optional[Dict[str, List[int]]]=None,
pulse_energy: Optional[np.ndarray]=None) -> Union[np.ndarray, Dict[str, np.ndarray]]:
"""
Get a dictionary with the channel names for the inut low resolution data and output
only the relevant input data in an array.
@@ -488,6 +252,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
where i is a number between 1 and 4 and k is a letter between A and D.
keep_dictionary_structure: Whether to concatenate all channels, or keep them as a dictionary.
pulse_spacing: Distances between pulses in multi-pulse data. If there is only one pulse, set it to a list containing only the element zero.
pulse_energy: Pulse energy.
Returns: Concatenated and pre-processed low-resolution data of shape (train_id, pulse_id, features).
"""
@@ -503,8 +268,10 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
for channel, item in X.items()}
if not keep_dictionary_structure:
selected = list(y.values())
if self.poly:
selected += [np.sqrt(np.fabs(v)) for v in y.values()]
if pulse_energy is not None:
selected += [pulse_energy[:, np.newaxis, :]]
if self.poly:
selected += [pulse_energy[:, np.newaxis, :]*v for v in y.values()]
return np.concatenate(selected, axis=-1)
return y
@@ -750,34 +517,24 @@ class Model(TransformerMixin, BaseEstimator):
Set to None to perform no selection.
validation_size: Fraction (number between 0 and 1) of the data to take for
validation and systematic uncertainty estimate.
n_nonlinear_kernel: Number of nonlinear kernel components added at the preprocessing stage
to obtain nonlinearities as an input and improve the prediction.
poly: Whether to use a polynomial expantion of the low-resolution data.
bnn: Use BNN?
"""
def __init__(self,
channels:List[str]=[f"channel_{j}_{k}"
for j, k in product(range(1, 5), ["A", "B", "C", "D"])],
n_pca_lr: int=400,
n_pca_lr: int=600,
n_pca_hr: int=20,
high_res_sigma: float=0.2,
tof_start: Optional[int]=None,
delta_tof: Optional[int]=300,
validation_size: float=0.05,
n_nonlinear_kernel: int=0,
poly: bool=False,
bnn: bool=True,
):
self.high_res_sigma = high_res_sigma
# models
self.x_select = SelectRelevantLowResolution(channels, tof_start, delta_tof, poly=poly)
self.x_select = SelectRelevantLowResolution(channels, tof_start, delta_tof, poly=not bnn)
x_model_steps = list()
self.n_nonlinear_kernel = n_nonlinear_kernel
if n_nonlinear_kernel > 0:
# Kernel PCA using Nystroem
x_model_steps += [('fex', Pipeline([('prepca', PCA(n_pca_lr, whiten=True)),
('nystroem', Nystroem(n_components=n_nonlinear_kernel, kernel='rbf', gamma=None, n_jobs=8)),
])),
]
x_model_steps += [
('pca', PCA(n_pca_lr, whiten=True)),
('unc', UncertaintyHolder()),
@@ -788,28 +545,22 @@ class Model(TransformerMixin, BaseEstimator):
('pca', PCA(n_pca_hr, whiten=True)),
('unc', UncertaintyHolder()),
])
#self.ood = {ch: IsolationForest(n_jobs=-1)
# for ch in channels+['full']}
self.ood = {ch: UncorrelatedDeviation(sigma=5)
for ch in channels+['full']}
#self.ood = {ch: EllipticEnvelope(contamination=0.003)
# for ch in channels+['full']}
#self.fit_model = MultiOutputWithStd(ARDRegression(n_iter=300, tol=1e-8, verbose=True), n_jobs=8)
self.fit_model = MultiOutputWithStd(BayesianRidge(n_iter=300, tol=1e-8, verbose=True), n_jobs=8)
#self.fit_model = FitModel()
if bnn:
self.fit_model = BNNModel()
else:
self.fit_model = MultiOutputWithStd(BayesianRidge(n_iter=300, tol=1e-8, verbose=True), n_jobs=8)
self.bnn = bnn
self.kde_xgm = None
self.mu_xgm = np.nan
self.sigma_xgm = np.nan
self.kde_intensity = None
self.mu_intensity = np.nan
self.sigma_intensity = np.nan
# we are reducing per channel from 2*delta_tof to delta_tof to check correlations
n_pca_lr_per_channel = delta_tof
self.channel_pca = {ch: PCA(n_pca_lr_per_channel, whiten=True)
self.channel_pca = {ch: IncrementalPCA(n_pca_lr_per_channel, whiten=True)
for ch in channels}
#self.channel_fit_model = {ch: FitModel() for ch in channels}
# size of the test subset
self.validation_size = validation_size
@@ -872,7 +623,8 @@ class Model(TransformerMixin, BaseEstimator):
def fit(self, low_res_data: Dict[str, np.ndarray],
high_res_data: np.ndarray, high_res_photon_energy: np.ndarray,
weights: Optional[np.ndarray]=None
weights: Optional[np.ndarray]=None,
pulse_energy: Optional[np.ndarray]=None,
) -> np.ndarray:
"""
Train the model.
@@ -891,7 +643,8 @@ class Model(TransformerMixin, BaseEstimator):
if weights is None:
weights = np.ones(high_res_data.shape[0])
print("Fitting PCA on low-resolution data.")
low_res_select = self.x_select.fit_transform(low_res_data)
self.x_select.fit(low_res_data)
low_res_select = self.x_select.transform(low_res_data, pulse_energy=pulse_energy)
# keep the number of pulses
B, P, _ = low_res_select.shape
low_res_select = low_res_select.reshape((B*P, -1))
@@ -902,7 +655,6 @@ class Model(TransformerMixin, BaseEstimator):
print("Fitting PCA on high-resolution data.")
y_t = self.y_model.fit_transform(high_res_data, smoothen__energy=high_res_photon_energy)
#self.fit_model.set_params(fex__gamma=1.0/float(x_t.shape[0]))
print("Fitting outlier detection")
self.ood['full'].fit(x_t)
inliers = self.ood['full'].predict(x_t) > 0.0
@@ -1015,12 +767,6 @@ class Model(TransformerMixin, BaseEstimator):
self.resolution = np.sqrt(energy_var)
#print("Resolution:", self.resolution)
# get intensity effect
intensity = np.sum(z, axis=1)
self.kde_intensity = gaussian_kde(intensity.reshape(-1), bw_method="scott")
self.mu_intensity = np.mean(intensity.reshape(-1), axis=0)
self.sigma_intensity = np.std(intensity.reshape(-1), axis=0)
# for consistency check per channel
selection_model = self.x_select
low_res_selected = selection_model.transform(low_res_data, keep_dictionary_structure=True)
@@ -1057,7 +803,7 @@ class Model(TransformerMixin, BaseEstimator):
result = {ch: is_inlier(low_res_selected[ch], ch) for ch in channels}
return result
def check_compatibility(self, low_res_data: Dict[str, np.ndarray], pulse_spacing: Optional[Dict[str, List[int]]]=None) -> np.ndarray:
def check_compatibility(self, low_res_data: Dict[str, np.ndarray], pulse_spacing: Optional[Dict[str, List[int]]]=None, pulse_energy: Optional[np.ndarray]=None) -> np.ndarray:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
using a robust covariance matrix estimate of the data
@@ -1065,10 +811,11 @@ class Model(TransformerMixin, BaseEstimator):
Args:
low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel).
pulse_spacing: The pulse spacing in multi-pulse data.
beam_intensity: Beam intensity.
Returns: An outlier score: if it is greater than 0, this is an outlier.
"""
low_res = self.x_select.transform(low_res_data, pulse_spacing=pulse_spacing)
low_res = self.x_select.transform(low_res_data, pulse_spacing=pulse_spacing, pulse_energy=pulse_energy)
B, P, _ = low_res.shape
pca_model = self.x_model
low_pca = pca_model.transform(low_res.reshape((B*P, -1)))
@@ -1078,11 +825,7 @@ class Model(TransformerMixin, BaseEstimator):
"""Get KDE for the XGM intensity."""
return self.kde_xgm
def intensity_profile(self) -> gaussian_kde:
"""Get KDE for the predicted intensity."""
return self.kde_intensity
def predict(self, low_res_data: Dict[str, np.ndarray], pulse_spacing: Optional[Dict[str, List[int]]]=None) -> Dict[str, np.ndarray]:
def predict(self, low_res_data: Dict[str, np.ndarray], pulse_spacing: Optional[Dict[str, List[int]]]=None, pulse_energy: Optional[np.ndarray]=None) -> Dict[str, np.ndarray]:
"""
Predict a high-resolution spectrum from a low resolution given one.
The output includes the uncertainty in its second and third entries of the first dimension.
@@ -1100,7 +843,7 @@ class Model(TransformerMixin, BaseEstimator):
#t += [time_ns()*1e-9]
#n += ["Initial"]
low_res_pre = self.x_select.transform(low_res_data, pulse_spacing=pulse_spacing)
low_res_pre = self.x_select.transform(low_res_data, pulse_spacing=pulse_spacing, pulse_energy=pulse_energy)
B, P, _ = low_res_pre.shape
low_res_pre = low_res_pre.reshape((B*P, -1))
#t += [time_ns()*1e-9]
@@ -1170,11 +913,10 @@ class Model(TransformerMixin, BaseEstimator):
joblib.dump([self.x_select,
self.x_model,
self.y_model,
self.fit_model,
self.fit_model.state_dict() if self.bnn else self.fit_model,
self.channel_pca,
#self.channel_fit_model
DataHolder(dict(mu_intensity=self.mu_intensity,
sigma_intensity=self.sigma_intensity,
DataHolder(dict(
mu_xgm=self.mu_xgm,
sigma_xgm=self.sigma_xgm,
wiener_filter_ft=self.wiener_filter_ft,
@@ -1184,11 +926,11 @@ class Model(TransformerMixin, BaseEstimator):
resolution=self.resolution,
transfer_function=self.transfer_function,
impulse_response=self.impulse_response,
bnn=self.bnn,
)
),
self.ood,
self.kde_xgm,
self.kde_intensity,
], filename, compress='zlib')
@staticmethod
@@ -1204,26 +946,14 @@ class Model(TransformerMixin, BaseEstimator):
(x_select,
x_model, y_model, fit_model,
channel_pca,
#channel_fit_model
extra,
ood,
kde_xgm,
kde_intensity,
) = joblib.load(filename)
obj = Model()
obj.x_select = x_select
obj.x_model = x_model
obj.y_model = y_model
obj.fit_model = fit_model
obj.channel_pca = channel_pca
#obj.channel_fit_model = channel_fit_model
obj.ood = ood
obj.kde_xgm = kde_xgm
obj.kde_intensity = kde_intensity
extra = extra.get_data()
obj.mu_intensity = extra["mu_intensity"]
obj.sigma_intensity = extra["sigma_intensity"]
obj.mu_xgm = extra["mu_xgm"]
obj.sigma_xgm = extra["sigma_xgm"]
obj.wiener_filter_ft = extra["wiener_filter_ft"]
@@ -1233,5 +963,19 @@ class Model(TransformerMixin, BaseEstimator):
obj.resolution = extra["resolution"]
obj.transfer_function = extra["transfer_function"]
obj.impulse_response = extra["impulse_response"]
obj.bnn = extra["bnn"]
obj.x_select = x_select
obj.x_model = x_model
obj.y_model = y_model
if obj.bnn:
obj.fit_model = BNNModel(state_dict=fit_model)
else:
obj.fit_model = fit_model
obj.channel_pca = channel_pca
#obj.channel_fit_model = channel_fit_model
obj.ood = ood
obj.kde_xgm = kde_xgm
return obj
Loading