Skip to content
Snippets Groups Projects
Commit c9f1bc6e authored by Danilo Ferreira de Lima's avatar Danilo Ferreira de Lima
Browse files

Added inverse PCA transform and uncs.

parent 13def540
No related branches found
No related tags found
No related merge requests found
import numpy as np
from autograd import numpy as anp
from autograd import grad
import h5py
from scipy.signal import fftconvolve
from scipy.optimize import fmin_l_bfgs_b
from sklearn.decomposition import PCA
from typing import Dict, List
from sklearn.model_selection import train_test_split
from typing import Dict, List, Optional
def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray:
"""Returns list of train IDs common to sets a, b and c."""
......@@ -36,11 +42,23 @@ class Model(object):
self.lr_pca = PCA(n_pca_lr, whiten=True)
self.hr_pca = PCA(n_pca_hr, whiten=True)
# PCA unc. in high resolution
self.high_pca_unc: Optional[np.ndarray] = None
# fit model
self.fit_model = FitModel()
# size of the test subset
self.test_size = 0.05
# where to cut on the ToF PES data
self.tof_start = 31445
self.delta_tof = 200
self.tof_end = self.tof_start + self.delta_tof
# high-resolution photon energy axis
self.high_res_photon_energy: Optional[np.ndarray] = None
# smoothing of the SPEC data in eV
self.high_res_sigma = 0.2
......@@ -57,37 +75,48 @@ class Model(object):
cat = np.concatenate([low_res_data[k][:, self.tof_start:self.tof_end] for k in self.channels], axis=1)
return cat
def preprocess_high_res(self, high_res_data: np.ndarray) -> np.ndarray:
def preprocess_high_res(self, high_res_data: np.ndarray, high_res_photon_energy: np.ndarray) -> np.ndarray:
"""
Get the high resolution data and preprocess it.
Args:
high_res_data: High resolution data with shape (train_id, features).
high_res_photon_energy: High resolution photon energy values (the "x"-axis of the high resolution data) with shape (train_id, features).
Returns: Pre-processed high-resolution data of shape (train_id, features) before.
"""
# Apply smoothing
# TODO: Why?!
mu = high_res_data[0,high_res_data.shape[1]//2]
gaussian = np.exp(-((high_res_data - mu)/self.high_res_sigma)**2/2)/np.sqrt(2*np.pi*self.high_res_sigma**2)
# TODO: why 80?!
high_res_gc = fftconvolve(high_res_data, gaussian, mode="same", axes=1)/80
n_features = high_res_data.shape[1]
mu = high_res_photon_energy[0, n_features//2]
gaussian = np.exp(-((high_res_photon_energy - mu)/self.high_res_sigma)**2/2)/np.sqrt(2*np.pi*self.high_res_sigma**2)
# 80 to match normalization (empirically taken)
high_res_gc = fftconvolve(high_res_data, gaussian, mode="same", axes=1)/80.0
return high_res_gc
def fit(self, low_res_data: Dict[str, np.ndarray], high_res_data: np.ndarray):
def fit(self, low_res_data: Dict[str, np.ndarray], high_res_data: np.ndarray, high_res_photon_energy: np.ndarray):
"""
Train the model.
Args:
low_res_data: Low resolution data as a dictionary with the key set to `channel_{i}_{k}`, where i is a number between 1 and 4 and k is a letter between A and D. For each dictionary entry, a numpy array is expected with shape (train_id, ToF channel).
high_res_data: Reference high resolution data with a one-to-one match to the low resolution data in the train_id dimension. Shape (train_id, ToF channel).
high_res_photon_energy: Photon energy axis for the high-resolution data.
"""
self.high_res_photon_energy = high_res_photon_energy
low_res = self.preprocess_low_res(low_res_data)
high_res = self.preprocess_high_res(high_res_data)
high_res = self.preprocess_high_res(high_res_data, high_res_photon_energy)
# fit PCA
low_pca = self.lr_pca.fit_transform(low_res)
high_pca = self.hr_pca.fit_transform(high_res)
pass
# split in train and test for PCA uncertainty evaluation
low_pca_train, low_pca_test, high_pca_train, high_pca_test = train_test_split(low_pca, high_pca, test_size=self.test_size)
# fit the linear model
self.fit_model.fit(low_pca_train, high_pca_train, low_pca_test, high_pca_test)
high_pca_rec = self.hr_pca.inverse_transform(high_pca)
self.high_pca_unc = np.sqrt(np.mean((high_res - high_pca_rec)**2, axis=0))
def predict(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray:
"""
......@@ -101,9 +130,239 @@ class Model(object):
"""
low_res = self.preprocess_low_res(low_res_data)
low_pca = self.lr_pca.transform(low_res)
# TODO: Get high res.
# high_pca = linear_model.predict(low_pca)
high_res_predicted = self.hr_pca.inverse_transform(high_pca)
# TODO: Add uncertainties
return high_res_predicted
# Get high res.
high_pca = self.fit_model.predict(low_pca, None, None)
high_res_predicted = self.hr_pca.inverse_transform(high_pca["Y"])
high_res_unc = self.hr_pca.inverse_transform(high_pca["Y"] + high_pca["Y_eps"]) - high_pca_predicted
result = np.stack((high_res_predicted, high_res_unc, self.high_pca_unc), axis=0)
return result
def save(self, filename: str):
"""
Save the fit model in a file.
Args:
filename: H5 file name where to save this.
"""
with h5py.File(filename, 'w') as hf:
d = self.fit_model.as_dict()
for key, value in d.items():
if isinstance(value, int):
hf.attrs[key] = value
else:
hf.create_dataset(key, data=value)
def load(self, filename: str):
"""
Load model from a file.
Args:
filename: Name of the file where to read the model from.
"""
with h5py.File(filename, 'r') as hf:
d = {k: hf[k][()] for k in hf.keys()}
d.update({k: hf.attrs[k] for k in hf.attrs})
self.fit_model.from_dict(d)
class FitModel(object):
"""
Linear regression model with uncertainties.
"""
def __init__(self):
# training dataset
self.X_train: Optional[np.ndarray] = None
self.Y_train: Optional[np.ndarray] = None
# test dataset to evaluate uncertainty
self.X_test: Optional[np.ndarray] = None
self.Y_test: Optional[np.ndarray] = None
# normalized target
self.Y_train_norm = None
self.Y_test_norm = None
# model parameter sizes
self.Nx: int = 0
self.Ny: int = 0
# fit result
self.A_inf: np.ndarray = None
self.b_inf: np.ndarray = None
self.u_inf: np.ndarray = None
# fit monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
self.input_data = None
def fit(self, X_train: np.ndarray, Y_train: np.ndarray, X_test: np.ndarray, Y_test: np.ndarray):
"""
Perform the fit and evaluate uncertainties with the test set.
"""
# training dataset
self.X_train: np.ndarray = X_train
self.Y_train: np.ndarray = Y_train
# test dataset to evaluate uncertainty
self.X_test: np.ndarray = X_test
self.Y_test: np.ndarray = Y_test
# model parameter sizes
self.Nx: int = self.X_train.shape[1]
self.Ny: int = self.Y_train.shape[1]
# initial parameter values
A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny)
Aeps: np.ndarray = np.zeros(self.Nx)
b0: np.ndarray = np.zeros(self.Ny)
u0: np.ndarray = np.zeros(self.Ny)
x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0)
# reset loss monitoring
self.loss_train: List[float] = list()
self.loss_test: List[float] = list()
def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float:
"""
Calculate the loss function value for a given parameter set `x`.
Args:
x: The parameters as a flat array.
X: The independent-variable dataset.
Y: The dependent-variable dataset.
Returns: The loss.
"""
# diag( (in @ x - out) @ (in @ x - out)^T )
A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny))
b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny))
b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny))
A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1))
log_unc = X @ A_eps + b_eps
#log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps))
iunc2 = anp.exp(-2*log_unc)
#print("iunc2", iunc2)
#print("log_unc", log_unc)
return anp.mean( (0.5*((X@ A + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) # Put RELU on (XX@x) and introduce new matrix W
def loss_history(x: np.ndarray) -> float:
"""
Calculate the loss function and keep a history of it in training and testing.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, self.X_train, self.Y_train)
l_test = loss(x, self.X_test, self.Y_test)
self.loss_train += [l_train]
self.loss_test += [l_test]
return l_train
def loss_train(x: np.ndarray) -> float:
"""
Calculate the loss function for the training dataset.
Args:
x: Parameters flattened out.
Returns: The loss value.
"""
l_train = loss(x, self.X_train, self.Y_train)
return l_train
grad_loss = grad(loss_train)
sc_op = fmin_l_bfgs_b(loss_history,
x0,
grad_loss,
disp=True,
factr=1e7,
maxiter=100,
iprint=0)
# Inference
self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny)
self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny)
self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp
self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1)
def as_dict(self) -> Dict[str, Any]:
"""
Export relevant variables to rebuild this object from a simple dictionary.
Args:
Returns: Dictionary with all relevant variables.
"""
return dict(
X_train=self.X_train,
X_test=self.X_test,
Y_train=self.Y_train,
Y_test=self.Y_test,
A_inf=self.A_inf,
b_inf=self.b_inf,
u_inf=self.u_inf,
A_eps=self.A_eps,
loss_train=self.loss_train,
loss_test=self.loss_test,
Nx=self.Nx,
Ny=self.Ny)
def from_dict(self, in_dict: Dict[str, Any]):
"""
Rebuild this object from a dictionary.
Args:
in_dict: The input dictionary with relevant variables.
"""
self.X_train = in_dict["X_train"]
self.X_test = in_dict["X_test"]
self.Y_train = in_dict["Y_train"]
self.Y_test = in_dict["Y_test"]
self.A_inf = in_dict["A_inf"]
self.b_inf = in_dict["b_inf"]
self.u_inf = in_dict["u_inf"]
self.A_eps = in_dict["A_eps"]
self.loss_train = in_dict["loss_train"]
self.loss_test = in_dict["loss_test"]
self.Nx = in_dict["Nx"]
self.Ny = in_dict["Ny"]
def predict(self, X: np.ndarray) -> Dict[str, np.ndarray]:
"""
Predict y from X.
Args:
X: Input dataset.
Returns: Predicted Y.
"""
result = dict()
# result
result["Y"] = (X @ self.A_inf + self.b_inf)
# flat uncertainty
result["Y_unc"] = self.u_inf[0,:]
# input-dependent uncertainty
result["Y_eps"] = np.exp(X @ self.A_eps + result["Y_unc"])
#self.result["res"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"]) # transform PCA space to real space
#self.result["res_unc"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] + self.model["u_inf"]) - self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] )
#self.result["res_unc"] = np.fabs(self.result["res_unc"])
#self.result["res_eps"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] + self.result["res_pca_eps"]) - self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] )
#self.result["res_eps"] = np.fabs(self.result["res_eps"])
#self.Yhat_pca = self.model["spec_pca_model"].inverse_transform(self.model["Y_test"])
#self.result["res_unc_specpca"] = np.sqrt(((self.Yhat_pca - self.model["spec_target"])**2).mean(axis=0))
#self.result["res_unc_total"] = np.sqrt(self.result["res_eps"]**2 + self.result["res_unc_specpca"]**2)
return result
......@@ -2,4 +2,6 @@ numpy
scipy
scikit-learn
extra_data
autograd
h5py
matplotlib
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment