From c9f1bc6e16f9ab50e902d3e54061d25b9ba710d9 Mon Sep 17 00:00:00 2001 From: Danilo Ferreira de Lima <danilo.enoque.ferreira.de.lima@xfel.de> Date: Thu, 15 Dec 2022 11:58:35 +0100 Subject: [PATCH] Added inverse PCA transform and uncs. --- pes_to_spec/model.py | 289 ++++++++++++++++++++++++++++++++++++++++--- requirements.txt | 2 + 2 files changed, 276 insertions(+), 15 deletions(-) diff --git a/pes_to_spec/model.py b/pes_to_spec/model.py index 8ac73a5..60732b1 100644 --- a/pes_to_spec/model.py +++ b/pes_to_spec/model.py @@ -1,7 +1,13 @@ import numpy as np +from autograd import numpy as anp +from autograd import grad +import h5py from scipy.signal import fftconvolve +from scipy.optimize import fmin_l_bfgs_b from sklearn.decomposition import PCA -from typing import Dict, List +from sklearn.model_selection import train_test_split + +from typing import Dict, List, Optional def matching_ids(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray: """Returns list of train IDs common to sets a, b and c.""" @@ -36,11 +42,23 @@ class Model(object): self.lr_pca = PCA(n_pca_lr, whiten=True) self.hr_pca = PCA(n_pca_hr, whiten=True) + # PCA unc. in high resolution + self.high_pca_unc: Optional[np.ndarray] = None + + # fit model + self.fit_model = FitModel() + + # size of the test subset + self.test_size = 0.05 + # where to cut on the ToF PES data self.tof_start = 31445 self.delta_tof = 200 self.tof_end = self.tof_start + self.delta_tof + # high-resolution photon energy axis + self.high_res_photon_energy: Optional[np.ndarray] = None + # smoothing of the SPEC data in eV self.high_res_sigma = 0.2 @@ -57,37 +75,48 @@ class Model(object): cat = np.concatenate([low_res_data[k][:, self.tof_start:self.tof_end] for k in self.channels], axis=1) return cat - def preprocess_high_res(self, high_res_data: np.ndarray) -> np.ndarray: + def preprocess_high_res(self, high_res_data: np.ndarray, high_res_photon_energy: np.ndarray) -> np.ndarray: """ Get the high resolution data and preprocess it. Args: high_res_data: High resolution data with shape (train_id, features). + high_res_photon_energy: High resolution photon energy values (the "x"-axis of the high resolution data) with shape (train_id, features). Returns: Pre-processed high-resolution data of shape (train_id, features) before. """ # Apply smoothing - # TODO: Why?! - mu = high_res_data[0,high_res_data.shape[1]//2] - gaussian = np.exp(-((high_res_data - mu)/self.high_res_sigma)**2/2)/np.sqrt(2*np.pi*self.high_res_sigma**2) - # TODO: why 80?! - high_res_gc = fftconvolve(high_res_data, gaussian, mode="same", axes=1)/80 + n_features = high_res_data.shape[1] + mu = high_res_photon_energy[0, n_features//2] + gaussian = np.exp(-((high_res_photon_energy - mu)/self.high_res_sigma)**2/2)/np.sqrt(2*np.pi*self.high_res_sigma**2) + # 80 to match normalization (empirically taken) + high_res_gc = fftconvolve(high_res_data, gaussian, mode="same", axes=1)/80.0 return high_res_gc - def fit(self, low_res_data: Dict[str, np.ndarray], high_res_data: np.ndarray): + def fit(self, low_res_data: Dict[str, np.ndarray], high_res_data: np.ndarray, high_res_photon_energy: np.ndarray): """ Train the model. Args: low_res_data: Low resolution data as a dictionary with the key set to `channel_{i}_{k}`, where i is a number between 1 and 4 and k is a letter between A and D. For each dictionary entry, a numpy array is expected with shape (train_id, ToF channel). high_res_data: Reference high resolution data with a one-to-one match to the low resolution data in the train_id dimension. Shape (train_id, ToF channel). + high_res_photon_energy: Photon energy axis for the high-resolution data. """ + + self.high_res_photon_energy = high_res_photon_energy + low_res = self.preprocess_low_res(low_res_data) - high_res = self.preprocess_high_res(high_res_data) + high_res = self.preprocess_high_res(high_res_data, high_res_photon_energy) # fit PCA low_pca = self.lr_pca.fit_transform(low_res) high_pca = self.hr_pca.fit_transform(high_res) - pass + # split in train and test for PCA uncertainty evaluation + low_pca_train, low_pca_test, high_pca_train, high_pca_test = train_test_split(low_pca, high_pca, test_size=self.test_size) + # fit the linear model + self.fit_model.fit(low_pca_train, high_pca_train, low_pca_test, high_pca_test) + + high_pca_rec = self.hr_pca.inverse_transform(high_pca) + self.high_pca_unc = np.sqrt(np.mean((high_res - high_pca_rec)**2, axis=0)) def predict(self, low_res_data: Dict[str, np.ndarray]) -> np.ndarray: """ @@ -101,9 +130,239 @@ class Model(object): """ low_res = self.preprocess_low_res(low_res_data) low_pca = self.lr_pca.transform(low_res) - # TODO: Get high res. - # high_pca = linear_model.predict(low_pca) - high_res_predicted = self.hr_pca.inverse_transform(high_pca) - # TODO: Add uncertainties - return high_res_predicted + # Get high res. + high_pca = self.fit_model.predict(low_pca, None, None) + high_res_predicted = self.hr_pca.inverse_transform(high_pca["Y"]) + high_res_unc = self.hr_pca.inverse_transform(high_pca["Y"] + high_pca["Y_eps"]) - high_pca_predicted + result = np.stack((high_res_predicted, high_res_unc, self.high_pca_unc), axis=0) + return result + + def save(self, filename: str): + """ + Save the fit model in a file. + + Args: + filename: H5 file name where to save this. + """ + with h5py.File(filename, 'w') as hf: + d = self.fit_model.as_dict() + for key, value in d.items(): + if isinstance(value, int): + hf.attrs[key] = value + else: + hf.create_dataset(key, data=value) + + def load(self, filename: str): + """ + Load model from a file. + + Args: + filename: Name of the file where to read the model from. + + """ + with h5py.File(filename, 'r') as hf: + d = {k: hf[k][()] for k in hf.keys()} + d.update({k: hf.attrs[k] for k in hf.attrs}) + self.fit_model.from_dict(d) + +class FitModel(object): + """ + Linear regression model with uncertainties. + """ + def __init__(self): + # training dataset + self.X_train: Optional[np.ndarray] = None + self.Y_train: Optional[np.ndarray] = None + + # test dataset to evaluate uncertainty + self.X_test: Optional[np.ndarray] = None + self.Y_test: Optional[np.ndarray] = None + + # normalized target + self.Y_train_norm = None + self.Y_test_norm = None + + # model parameter sizes + self.Nx: int = 0 + self.Ny: int = 0 + + # fit result + self.A_inf: np.ndarray = None + self.b_inf: np.ndarray = None + self.u_inf: np.ndarray = None + + # fit monitoring + self.loss_train: List[float] = list() + self.loss_test: List[float] = list() + + self.input_data = None + + def fit(self, X_train: np.ndarray, Y_train: np.ndarray, X_test: np.ndarray, Y_test: np.ndarray): + """ + Perform the fit and evaluate uncertainties with the test set. + """ + + # training dataset + self.X_train: np.ndarray = X_train + self.Y_train: np.ndarray = Y_train + + # test dataset to evaluate uncertainty + self.X_test: np.ndarray = X_test + self.Y_test: np.ndarray = Y_test + + # model parameter sizes + self.Nx: int = self.X_train.shape[1] + self.Ny: int = self.Y_train.shape[1] + + # initial parameter values + A0: np.ndarray = np.eye(self.Nx, self.Ny).reshape(self.Nx*self.Ny) + Aeps: np.ndarray = np.zeros(self.Nx) + b0: np.ndarray = np.zeros(self.Ny) + u0: np.ndarray = np.zeros(self.Ny) + x0: np.ndarray = np.concatenate((A0, b0, u0, Aeps), axis=0) + + # reset loss monitoring + self.loss_train: List[float] = list() + self.loss_test: List[float] = list() + + def loss(x: np.ndarray, X: np.ndarray, Y: np.ndarray) -> float: + """ + Calculate the loss function value for a given parameter set `x`. + + Args: + x: The parameters as a flat array. + X: The independent-variable dataset. + Y: The dependent-variable dataset. + + Returns: The loss. + """ + # diag( (in @ x - out) @ (in @ x - out)^T ) + A = x[:self.Nx*self.Ny].reshape((self.Nx, self.Ny)) + b = x[self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape((1, self.Ny)) + + b_eps = x[(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape((1, self.Ny)) + A_eps = x[(self.Nx*self.Ny+self.Ny+self.Ny):].reshape((self.Nx, 1)) + log_unc = X @ A_eps + b_eps + + #log_unc = anp.log(anp.exp(log_unc) + anp.exp(log_eps)) + iunc2 = anp.exp(-2*log_unc) + + #print("iunc2", iunc2) + #print("log_unc", log_unc) + + return anp.mean( (0.5*((X@ A + b - Y)**2)*iunc2 + log_unc).sum(axis=1), axis=0 ) # Put RELU on (XX@x) and introduce new matrix W + + def loss_history(x: np.ndarray) -> float: + """ + Calculate the loss function and keep a history of it in training and testing. + + Args: + x: Parameters flattened out. + + Returns: The loss value. + """ + l_train = loss(x, self.X_train, self.Y_train) + l_test = loss(x, self.X_test, self.Y_test) + + self.loss_train += [l_train] + self.loss_test += [l_test] + return l_train + + def loss_train(x: np.ndarray) -> float: + """ + Calculate the loss function for the training dataset. + + Args: + x: Parameters flattened out. + + Returns: The loss value. + """ + l_train = loss(x, self.X_train, self.Y_train) + return l_train + + grad_loss = grad(loss_train) + sc_op = fmin_l_bfgs_b(loss_history, + x0, + grad_loss, + disp=True, + factr=1e7, + maxiter=100, + iprint=0) + + # Inference + self.A_inf = sc_op[0][:self.Nx*self.Ny].reshape(self.Nx, self.Ny) + self.b_inf = sc_op[0][self.Nx*self.Ny:(self.Nx*self.Ny+self.Ny)].reshape(1, self.Ny) + self.u_inf = sc_op[0][(self.Nx*self.Ny+self.Ny):(self.Nx*self.Ny+self.Ny+self.Ny)].reshape(1, self.Ny) # removed np.exp + self.A_eps = sc_op[0][(self.Nx*self.Ny+self.Ny+self.Ny):].reshape(self.Nx, 1) + + def as_dict(self) -> Dict[str, Any]: + """ + Export relevant variables to rebuild this object from a simple dictionary. + + Args: + + Returns: Dictionary with all relevant variables. + """ + return dict( + X_train=self.X_train, + X_test=self.X_test, + Y_train=self.Y_train, + Y_test=self.Y_test, + A_inf=self.A_inf, + b_inf=self.b_inf, + u_inf=self.u_inf, + A_eps=self.A_eps, + loss_train=self.loss_train, + loss_test=self.loss_test, + Nx=self.Nx, + Ny=self.Ny) + def from_dict(self, in_dict: Dict[str, Any]): + """ + Rebuild this object from a dictionary. + + Args: + in_dict: The input dictionary with relevant variables. + + """ + self.X_train = in_dict["X_train"] + self.X_test = in_dict["X_test"] + self.Y_train = in_dict["Y_train"] + self.Y_test = in_dict["Y_test"] + self.A_inf = in_dict["A_inf"] + self.b_inf = in_dict["b_inf"] + self.u_inf = in_dict["u_inf"] + self.A_eps = in_dict["A_eps"] + self.loss_train = in_dict["loss_train"] + self.loss_test = in_dict["loss_test"] + self.Nx = in_dict["Nx"] + self.Ny = in_dict["Ny"] + + def predict(self, X: np.ndarray) -> Dict[str, np.ndarray]: + """ + Predict y from X. + + Args: + X: Input dataset. + + Returns: Predicted Y. + """ + + result = dict() + + # result + result["Y"] = (X @ self.A_inf + self.b_inf) + # flat uncertainty + result["Y_unc"] = self.u_inf[0,:] + # input-dependent uncertainty + result["Y_eps"] = np.exp(X @ self.A_eps + result["Y_unc"]) + + #self.result["res"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"]) # transform PCA space to real space + #self.result["res_unc"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] + self.model["u_inf"]) - self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] ) + #self.result["res_unc"] = np.fabs(self.result["res_unc"]) + #self.result["res_eps"] = self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] + self.result["res_pca_eps"]) - self.model["spec_pca_model"].inverse_transform(self.result["res_pca"] ) + #self.result["res_eps"] = np.fabs(self.result["res_eps"]) + #self.Yhat_pca = self.model["spec_pca_model"].inverse_transform(self.model["Y_test"]) + #self.result["res_unc_specpca"] = np.sqrt(((self.Yhat_pca - self.model["spec_target"])**2).mean(axis=0)) + #self.result["res_unc_total"] = np.sqrt(self.result["res_eps"]**2 + self.result["res_unc_specpca"]**2) + return result diff --git a/requirements.txt b/requirements.txt index f09e9ec..eff7f6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,6 @@ numpy scipy scikit-learn extra_data +autograd +h5py matplotlib -- GitLab