Skip to content
Snippets Groups Projects
Commit 69335d7c authored by Danilo Enoque Ferreira de Lima's avatar Danilo Enoque Ferreira de Lima
Browse files

Merge branch 'allow_ard' into 'main'

Allow for three model options: Ridge, ARD and BNN.

See merge request !13
parents 6d81308b 80ef2d0d
No related branches found
No related tags found
1 merge request!13Allow for three model options: Ridge, ARD and BNN.
This diff is collapsed.
......@@ -8,8 +8,8 @@ from sklearn.decomposition import IncrementalPCA, PCA
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.base import OutlierMixin
from sklearn.pipeline import Pipeline
from sklearn.kernel_approximation import Nystroem
from sklearn.linear_model import BayesianRidge
from sklearn.linear_model import ARDRegression
from sklearn.metrics import accuracy_score
from scipy.stats import gaussian_kde
from itertools import product
......@@ -20,7 +20,7 @@ from copy import deepcopy
from pes_to_spec.bnn import BNNModel
from typing import Any, Dict, List, Optional, Union, Tuple
from typing import Any, Dict, List, Optional, Union, Tuple, Literal
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
"""Returns list of train IDs common to sets a, b and c."""
......@@ -336,30 +336,37 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
sum_low_res = - np.mean(sum(list(X.values())), axis=0)
peak_idx = self.estimate_prompt_peak(X)
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(8, 16))
fig = plt.figure(figsize=(8, 8))
ax = plt.gca()
ax.plot(np.arange(peak_idx-300, peak_idx+300),
sum_low_res[peak_idx-300:peak_idx+300],
c="b",
first = peak_idx - 300
last = peak_idx + 300
first = max(0, first)
last = min(sum_low_res.shape[0]-1, last)
ax.plot(np.arange(first, last),
sum_low_res[first:last],
c="k",
label="Data")
ax.set(title="",
xlabel="Photon Spectrometer channel",
ylabel="Sum of all Photon Spectrometer channels")
plt.axvline(peak_idx,
linewidth=3,
ls="--",
color='r',
label="Peak position")
ax.legend()
xlabel="Photon Spectrometer bin",
ylabel=r"$\sum$ PES channels")
#plt.axvline(peak_idx,
# linewidth=3,
# ls="--",
# color='r',
# label="Peak position")
#ax.legend(frameon=False)
plt.savefig(filename)
plt.close(fig)
def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray, w: np.ndarray):
def _fit_estimator(estimator, X: np.ndarray, y: np.ndarray, w: Optional[np.ndarray]=None):
estimator = clone(estimator)
estimator.fit(X, y, w)
if w is None:
estimator.fit(X, y)
else:
estimator.fit(X, y, w)
return estimator
class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
class MultiOutputRidgeWithStd(MetaEstimatorMixin, BaseEstimator):
def __init__(self, estimator, *, n_jobs=-1):
self.estimator = estimator
......@@ -382,9 +389,6 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
"y must have at least two dimensions for "
"multi-output regression but has only one."
)
if weights is None:
weights = np.ones(y.shape[0])
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(_fit_estimator)(
self.estimator, X, y[:, i], weights
......@@ -416,6 +420,59 @@ class MultiOutputWithStd(MetaEstimatorMixin, BaseEstimator):
y_std = np.sqrt(sigmas_squared_data + self.fast_inv_alpha)
return y, y_std
class MultiOutputGenericWithStd(MetaEstimatorMixin, BaseEstimator):
def __init__(self, estimator, *, n_jobs=-1):
self.estimator = estimator
self.n_jobs = n_jobs
def fit(self, X: np.ndarray, y: np.ndarray, weights: Optional[np.ndarray]=None):
"""Fit the model to data, separately for each output variable.
Args:
X: {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
y: {array-like, sparse matrix} of shape (n_samples, n_outputs)
Multi-output targets. An indicator matrix turns on multilabel
estimation.
Returns: self.
"""
if y.ndim == 1:
raise ValueError(
"y must have at least two dimensions for "
"multi-output regression but has only one."
)
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(_fit_estimator)(
self.estimator, X, y[:, i], weights
)
for i in range(y.shape[1])
)
return self
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Predict multi-output variable using model for each target variable.
Args:
X: {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
Returns: {array-like, sparse matrix} of shape (n_samples, n_outputs)
Multi-output targets predicted across multiple predictors.
Note: Separate models are generated for each predictor.
"""
y = Parallel(n_jobs=self.n_jobs)(
delayed(e.predict)(X, return_std) for e in self.estimators_
)
if return_std:
y, unc = zip(*y)
return np.asarray(y).T, np.asarray(unc).T
return np.asarray(y).T
class UncorrelatedDeviation(OutlierMixin, BaseEstimator):
"""
Detect outliers from uncorrelated inputs.
......@@ -517,23 +574,23 @@ class Model(TransformerMixin, BaseEstimator):
Set to None to perform no selection.
validation_size: Fraction (number between 0 and 1) of the data to take for
validation and systematic uncertainty estimate.
bnn: Use BNN?
model_type: Which model to use. "bnn" for a BNN, "ridge" for Ridge and "ard" for ARD.
"""
def __init__(self,
channels:List[str]=[f"channel_{j}_{k}"
for j, k in product(range(1, 5), ["A", "B", "C", "D"])],
n_pca_lr: int=600,
n_pca_lr: int=1000,
n_pca_hr: int=20,
high_res_sigma: float=0.2,
tof_start: Optional[int]=None,
delta_tof: Optional[int]=300,
validation_size: float=0.05,
bnn: bool=True,
model_type: Literal["bnn", "ridge", "ard"]="ard",
):
self.high_res_sigma = high_res_sigma
# models
self.x_select = SelectRelevantLowResolution(channels, tof_start, delta_tof, poly=not bnn)
self.x_select = SelectRelevantLowResolution(channels, tof_start, delta_tof, poly=(model_type not in ["bnn"]))
x_model_steps = list()
x_model_steps += [
('pca', PCA(n_pca_lr, whiten=True)),
......@@ -547,11 +604,14 @@ class Model(TransformerMixin, BaseEstimator):
])
self.ood = {ch: UncorrelatedDeviation(sigma=5)
for ch in channels+['full']}
if bnn:
if model_type == "bnn":
self.fit_model = BNNModel()
else:
self.fit_model = MultiOutputWithStd(BayesianRidge(n_iter=300, tol=1e-8, verbose=True), n_jobs=8)
self.bnn = bnn
elif model_type == "ridge":
self.fit_model = MultiOutputRidgeWithStd(BayesianRidge(n_iter=300, tol=1e-8, verbose=True), n_jobs=8)
elif model_type == "ard":
self.fit_model = MultiOutputGenericWithStd(ARDRegression(n_iter=300, tol=1e-8, verbose=True), n_jobs=8)
self.model_type = model_type
self.n_obs = 0
self.kde_xgm = None
self.mu_xgm = np.nan
......@@ -565,6 +625,12 @@ class Model(TransformerMixin, BaseEstimator):
# size of the test subset
self.validation_size = validation_size
def n_pars(self) -> float:
"""Get number of parameters."""
if self.model_type == "bnn":
return sum(p.numel() for p in self.fit_model.model.parameters())
return sum(len(estimator.coef_) + 1 for estimator in self.fit_model.estimators_)
def get_channels(self) -> List[str]:
"""Get channels used in training."""
return self.x_select.channels
......@@ -621,6 +687,33 @@ class Model(TransformerMixin, BaseEstimator):
w = w/np.median(w)
return w
def count_peaks(self, hr_data: np.ndarray, energy: np.ndarray) -> np.ndarray:
"""
Count number of peaks in the high-resolution data.
Args:
hr_data: High-resolution data. Shape (train_id, bin).
energy: Energy axis in eV.
Returns: Number of peaks per train ID.
"""
bkg_sigma = 5
threshold = 20
avg_energy = energy[:, energy.shape[1]//2, np.newaxis]
gaussian_bkg = np.exp(-0.5*(energy - avg_energy)**2/bkg_sigma**2)
gaussian_bkg /= np.sum(gaussian_bkg, axis=1, keepdims=True)
gaussian = np.exp(-0.5*(energy - avg_energy)**2/self.high_res_sigma**2)
gaussian /= np.sum(gaussian, axis=1, keepdims=True)
smooth = fftconvolve(hr_data, gaussian, mode="same", axes=1)
bkg = fftconvolve(hr_data, gaussian_bkg, mode="same", axes=1)
extrema = np.diff(smooth, axis=1)
d2 = np.diff(extrema, axis=1)
peaks = ((extrema[:, :-1]*extrema[:, 1:] < 0)
& (d2 < 0)
& (smooth[:,:-2] > bkg[:,:-2])
& (smooth[:,:-2] > threshold) )
return np.count_nonzero(peaks, axis=1)
def fit(self, low_res_data: Dict[str, np.ndarray],
high_res_data: np.ndarray, high_res_photon_energy: np.ndarray,
weights: Optional[np.ndarray]=None,
......@@ -640,8 +733,10 @@ class Model(TransformerMixin, BaseEstimator):
Returns: Smoothened high resolution spectrum.
"""
if weights is None:
weights = np.ones(high_res_data.shape[0])
print("Checking data quality in high-resolution data.")
peaks = self.count_peaks(high_res_data, high_res_photon_energy)
filter_hr = (peaks > 3)
print("Fitting PCA on low-resolution data.")
self.x_select.fit(low_res_data)
low_res_select = self.x_select.transform(low_res_data, pulse_energy=pulse_energy)
......@@ -652,14 +747,19 @@ class Model(TransformerMixin, BaseEstimator):
n_components = min(self.x_model["pca"].n_components, low_res_select.shape[0])
self.x_model.set_params(pca__n_components=n_components)
x_t = self.x_model.fit_transform(low_res_select)
#print("PCA fraction of variance (LR): ", np.cumsum(self.x_model["pca"].explained_variance_ratio_))
print("Fitting PCA on high-resolution data.")
y_t = self.y_model.fit_transform(high_res_data, smoothen__energy=high_res_photon_energy)
#print("PCA fraction of variance (HR): ", np.cumsum(self.y_model["pca"].explained_variance_ratio_))
print("Fitting outlier detection")
self.ood['full'].fit(x_t)
inliers = self.ood['full'].predict(x_t) > 0.0
print("Fitting model.")
self.fit_model.fit(x_t[inliers], y_t[inliers], weights[inliers])
if weights is not None:
weights = weights[inliers & filter_hr]
self.fit_model.fit(x_t[inliers & filter_hr], y_t[inliers & filter_hr], weights)
self.n_obs = len(x_t[inliers & filter_hr])
# calculate the effect of the PCA
print("Calculate PCA unc. on high-resolution data.")
......@@ -677,10 +777,10 @@ class Model(TransformerMixin, BaseEstimator):
# n: noise (uncertainty)
# e: energy
# true signal (as far as we can get -- it is smoothened, but this is the model target)
d = high_res[inliers]
d = high_res[inliers & filter_hr]
D = np.fft.fft(d)
y_pred, n = self.fit_model.predict(x_t[inliers], return_std=True)
y_pred, n = self.fit_model.predict(x_t[inliers & filter_hr], return_std=True)
z = self.y_model['pca'].inverse_transform(y_pred)
n = np.sqrt((self.y_model['pca'].inverse_transform(y_pred + n) - z)**2 + high_pca_unc**2)
......@@ -885,6 +985,8 @@ class Model(TransformerMixin, BaseEstimator):
unc=unc.reshape((B, P, -1)),
pca=pca_unc,
total_unc=total_unc.reshape((B, P, -1)),
expected_pca=high_pca.reshape((B, P, -1)),
expected_pca_unc=high_pca_unc.reshape((B, P, -1)),
)
def deconvolve(self, expected: np.ndarray) -> np.ndarray:
......@@ -913,7 +1015,7 @@ class Model(TransformerMixin, BaseEstimator):
joblib.dump([self.x_select,
self.x_model,
self.y_model,
self.fit_model.state_dict() if self.bnn else self.fit_model,
self.fit_model.state_dict() if self.model_type == "bnn" else self.fit_model,
self.channel_pca,
#self.channel_fit_model
DataHolder(dict(
......@@ -926,7 +1028,8 @@ class Model(TransformerMixin, BaseEstimator):
resolution=self.resolution,
transfer_function=self.transfer_function,
impulse_response=self.impulse_response,
bnn=self.bnn,
model_type=self.model_type,
n_obs=self.n_obs,
)
),
self.ood,
......@@ -963,12 +1066,13 @@ class Model(TransformerMixin, BaseEstimator):
obj.resolution = extra["resolution"]
obj.transfer_function = extra["transfer_function"]
obj.impulse_response = extra["impulse_response"]
obj.bnn = extra["bnn"]
obj.model_type = extra["model_type"]
obj.n_obs = extra["n_obs"]
obj.x_select = x_select
obj.x_model = x_model
obj.y_model = y_model
if obj.bnn:
if obj.model_type == "bnn":
obj.fit_model = BNNModel(state_dict=fit_model)
else:
obj.fit_model = fit_model
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment