Skip to content
Snippets Groups Projects

Includes input energy parameter in the model and adds non-linearities

Merged Danilo Enoque Ferreira de Lima requested to merge with_energy into main
1 file
+ 13
6
Compare changes
  • Side-by-side
  • Inline
+ 281
0
from sklearn.base import BaseEstimator, RegressorMixin
from typing import Any, Dict, Optional, Union, Tuple
import numpy as np
from scipy.special import gamma
import torch
import torch.nn as nn
import torchbnn as bnn
from torch.utils.data import TensorDataset, DataLoader
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self, name, fmt=':f'):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix
def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print(' '.join(entries))
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
class BNN(nn.Module):
"""
A model Bayesian Neural network.
Each weight is represented by a Gaussian with a mean and a standard deviation.
Each evaluation of forward leads to a different choice of the weights, so running
forward several times we can check the effect of the weights variation on the same input.
The neg_log_likelihood function implements the negative log likelihood to be used as the first part of the loss
function (the second shall be the Kullback-Leibler divergence).
The negative log-likelihood is simply the negative log likelihood of a Gaussian
between the prediction and the true value. The standard deviation of the Gaussian is left as a
parameter to be fit: sigma.
"""
def __init__(self, input_dimension: int=1, output_dimension: int=1):
super(BNN, self).__init__()
hidden_dimension = 100
# controls the aleatoric uncertainty
self.log_isigma2 = nn.Parameter(-torch.ones(1)*np.log(0.1**2), requires_grad=True)
# controls the weight hyperprior
self.log_ilambda2 = nn.Parameter(-torch.ones(1)*np.log(0.1**2), requires_grad=True)
# inverse Gamma hyper prior alpha and beta
#
# Hyperprior choice on the weights:
# We want to allow the hyperprior on the weights' variance to have large variance,
# so that the weights prior can be anything, if possible, but at the same time prevent it from going to infinity
# (which would allow the weights to be anything, but remove regularization and de-stabilize the fit).
# Therefore, the weights should be allowed to have high std. dev. on their priors, just not so much so that the fit is unstable.
# At the same time, the prior std. dev. should not be too small (that would regularize too much.
# The values below have been taken from BoTorch (alpha, beta) = (3.0, 6.0) and seem to work well if the inputs have been standardized.
# They lead to a high mean for the weights std. dev. (18) and a large variance (sqrt(var) = 10.4), so that the weights prior is large
# and the only regularization is to prevent the weights from becoming > 18 + 3 sqrt(var) ~= 50, making this a very loose regularization.
# An alternative would be to set the (alpha, beta) both to very low values, whichmakes the hyper prior become closer to the non-informative Jeffrey's prior.
# Using this alternative (ie: (0.1, 0.1) for the weights' hyper prior) leads to very large lambda and numerical issues with the fit.
self.alpha_lambda = 3.0
self.beta_lambda = 6.0
# Hyperprior choice on the likelihood noise level:
# The likelihood noise level is controlled by sigma in the likelihood and it should be allowed to be very broad, but different
# from the weights prior, it must be allowed to be small, since if we have a lot of data, it is conceivable that there is little noise in the data.
# We therefore want to have high variance in the hyperprior for sigma, but we do not need to prevent it from becoming small.
# Making both alpha and beta small makes the gamma distribution closer to the Jeffey's prior, which makes it non-informative
# This seems to lead to a larger training time, though.
# Since, after standardization, we know to expect the variance to be of order (1), we can select also alpha and beta leading to high variance in this range
self.alpha_sigma = 2.0
self.beta_sigma = 0.15
self.model = nn.Sequential(
bnn.BayesLinear(prior_mu=0.0,
prior_sigma=torch.exp(-0.5*self.log_ilambda2),
in_features=input_dimension,
out_features=hidden_dimension),
nn.ReLU(),
bnn.BayesLinear(prior_mu=0.0,
prior_sigma=torch.exp(-0.5*self.log_ilambda2),
in_features=hidden_dimension,
out_features=output_dimension)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Calculate the result f(x) applied on the input x.
"""
return self.model(x)
def neg_log_gamma(self, log_x: torch.Tensor, x: torch.Tensor, alpha, beta) -> torch.Tensor:
"""
Return the negative log of the gamma pdf.
"""
return -alpha*np.log(beta) - (alpha - 1)*log_x + beta*x + gamma(alpha)
def neg_log_likelihood(self, prediction: torch.Tensor, target: torch.Tensor, w: torch.Tensor) -> torch.Tensor:
"""
Calculate the negative log-likelihood (divided by the batch size, since we take the mean).
"""
n_output = target.shape[1]
error = w*(prediction - target)
squared_error = error**2
sigma2 = torch.exp(-self.log_isigma2)[0]
norm_error = 0.5*squared_error/sigma2
norm_term = 0.5*(np.log(2*np.pi) - self.log_isigma2[0])*n_output
return norm_error.sum(dim=1).mean(dim=0) + norm_term
def neg_log_hyperprior(self) -> torch.Tensor:
"""
Calculate the negative log of the hyperpriors.
"""
# hyperprior for sigma to avoid large or too small sigma
# with a standardized input, this hyperprior forces sigma to be
# on avg. 1 and it is broad enough to allow for different sigma
isigma2 = torch.exp(self.log_ilambda2)[0]
neg_log_hyperprior_noise = self.neg_log_gamma(self.log_isigma2, isigma2, self.alpha_sigma, self.beta_sigma)
ilambda2 = torch.exp(self.log_ilambda2)[0]
neg_log_hyperprior_weights = self.neg_log_gamma(self.log_ilambda2, ilambda2, self.alpha_lambda, self.beta_lambda)
return neg_log_hyperprior_noise + neg_log_hyperprior_weights
def aleatoric_uncertainty(self) -> torch.Tensor:
"""
Get the aleatoric component of the uncertainty.
"""
#return 0
return torch.exp(-0.5*self.log_isigma2[0])
def w_precision(self) -> torch.Tensor:
"""
Get the weights precision.
"""
return torch.exp(self.log_ilambda2[0])
class BNNModel(RegressorMixin, BaseEstimator):
"""
Regression model with uncertainties.
Args:
"""
def __init__(self, state_dict=None):
if state_dict is not None:
Nx = state_dict["model.0.weight_mu"].shape[1]
Ny = state_dict["model.2.weight_mu"].shape[0]
self.model = BNN(Nx, Ny)
self.model.load_state_dict(state_dict)
else:
self.model = BNN()
self.model.eval()
def state_dict(self) -> Dict[str, Any]:
return self.model.state_dict()
def fit(self, X: np.ndarray, y: np.ndarray, weights: Optional[np.ndarray]=None, **fit_params) -> RegressorMixin:
"""
Perform the fit and evaluate uncertainties with the test set.
Args:
X: The input.
y: The target.
weights: The weights.
fit_params: If it contains X_test and y_test, they are used to validate the model.
Returns: The object itself.
"""
if weights is None:
weights = np.ones(len(X), dtype=np.float32)
if len(weights.shape) == 1:
weights = weights[:, np.newaxis]
ds = TensorDataset(torch.from_numpy(X),
torch.from_numpy(y),
torch.from_numpy(weights))
# create model
self.model = BNN(X.shape[1], y.shape[1])
# prepare data loader
B = 5
loader = DataLoader(ds,
batch_size=B,
num_workers=5,
shuffle=True,
#pin_memory=True,
drop_last=True,
)
optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
number_of_batches = len(ds)/float(B)
weight_prior = 1.0/float(number_of_batches)
# the NLL is divided by the number of batch samples
# so divide also the prior losses by the number of batch elements, so that the
# function optimized is F/# samples
# https://arxiv.org/pdf/1505.05424.pdf
weight_prior /= float(B)
# KL loss
kl_loss = bnn.BKLLoss(reduction='sum', last_layer_only=False)
# train
self.model.train()
epochs = 200
for epoch in range(epochs):
meter = {k: AverageMeter(k, ':6.3f')
for k in ('loss', '-log(lkl)', '-log(prior)', '-log(hyper)', 'sigma', 'w.prec.')}
progress = ProgressMeter(
len(loader),
meter.values(),
prefix="Epoch: [{}]".format(epoch))
for i, batch in enumerate(loader):
x_b, y_b, w_b = batch
y_b_pred = self.model(x_b)
nll = self.model.neg_log_likelihood(y_b_pred, y_b, w_b)
nlprior = weight_prior * kl_loss(self.model)
nlhyper = weight_prior * self.model.neg_log_hyperprior()
loss = nll + nlprior + nlhyper
optimizer.zero_grad()
loss.backward()
optimizer.step()
meter['loss'].update(loss.detach().cpu().item(), B)
meter['-log(lkl)'].update(nll.detach().cpu().item(), B)
meter['-log(prior)'].update(nlprior.detach().cpu().item(), B)
meter['-log(hyper)'].update(nlhyper.detach().cpu().item(), B)
meter['sigma'].update(self.model.aleatoric_uncertainty().detach().cpu().item(), B)
meter['w.prec.'].update(self.model.w_precision().detach().cpu().item(), B)
progress.display(len(loader))
self.model.eval()
return self
def predict(self, X: np.ndarray, return_std: bool=False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
"""
Predict y from X.
Args:
X: Input dataset.
Returns: Predicted Y and, if return_std is True, also its uncertainty.
"""
K = 10
y_pred = list()
for _ in range(K):
y_k = self.model(torch.from_numpy(X)).detach().numpy()
y_pred.append(y_k)
y_pred = np.stack(y_pred, axis=1)
y_mu = np.mean(y_pred, axis=1)
y_epi = np.std(y_pred, axis=1)
y_ale = self.model.aleatoric_uncertainty().detach().numpy()
y_unc = (y_epi**2 + y_ale**2)**0.5
if not return_std:
return y_mu
return y_mu, y_unc
Loading