from typing import Optional, Union
import numpy as np
import scipy.stats as stats
import pybop
from pybop.costs.base_cost import BaseCost
from pybop.parameters.parameter import Parameter, Parameters
from pybop.parameters.priors import BasePrior, JointLogPrior, Uniform
from pybop.problems.base_problem import BaseProblem
[docs]
class BaseLikelihood(BaseCost):
"""
Base class for likelihoods
"""
def __init__(self, problem: BaseProblem):
super().__init__(problem)
[docs]
self.n_data = problem.n_data
[docs]
self.minimising = False
[docs]
class GaussianLogLikelihoodKnownSigma(BaseLikelihood):
"""
This class represents a Gaussian Log Likelihood with a known sigma,
which assumes that the data follows a Gaussian distribution and computes
the log-likelihood of observed data under this assumption.
Parameters
----------
sigma0 : scalar or array
Initial standard deviation around ``x0``. Either a scalar value (one
standard deviation for all coordinates) or an array with one entry
per dimension.
"""
def __init__(self, problem: BaseProblem, sigma0: Union[list[float], float]):
super().__init__(problem)
sigma0 = self.check_sigma0(sigma0)
[docs]
self.sigma2 = sigma0**2.0
[docs]
self._offset = -0.5 * self.n_data * np.log(2 * np.pi * self.sigma2)
[docs]
self._multip = -1 / (2.0 * self.sigma2)
[docs]
def compute(
self,
y: dict,
dy: np.ndarray = None,
calculate_grad: bool = False,
) -> Union[float, tuple[float, np.ndarray]]:
"""
Compute the Gaussian log-likelihood for the given parameters with known sigma.
This method only computes the likelihood, without calling the problem.evaluateS1.
"""
# Verify we have dy if calculate_grad is True
self.verify_args(dy, calculate_grad)
# Early return if the prediction is not verified
if not self.verify_prediction(y):
return (-np.inf, -self.grad_fail) if calculate_grad else -np.inf
# Calculate residuals and error
r = np.asarray([self._target[signal] - y[signal] for signal in self.signal])
e = np.sum(self._offset + self._multip * np.sum(np.real(r * np.conj(r))))
if calculate_grad:
dl = np.sum((np.sum((r * dy.T), axis=2) / self.sigma2), axis=1)
return e, dl
return e
[docs]
def check_sigma0(self, sigma0: Union[np.ndarray, float]):
"""
Check the validity of sigma0.
"""
sigma0 = np.asarray(sigma0, dtype=float)
if not np.all(sigma0 > 0):
raise ValueError("Sigma0 must be positive")
if np.shape(sigma0) not in [(), (1,), (self.n_outputs,)]:
raise ValueError(
"sigma0 must be either a scalar value (one standard deviation for "
"all coordinates) or an array with one entry per dimension."
)
return sigma0
[docs]
class GaussianLogLikelihood(BaseLikelihood):
"""
This class represents a Gaussian Log Likelihood, which assumes that the
data follows a Gaussian distribution and computes the log-likelihood of
observed data under this assumption.
This class estimates the standard deviation of the Gaussian distribution
alongside the parameters of the model.
Attributes
----------
_logpi : float
Precomputed offset value for the log-likelihood function.
_dsigma_scale : float
Scale factor for derivative of standard deviation.
"""
def __init__(
self,
problem: BaseProblem,
sigma0: Union[float, list[float], list[Parameter]] = 1e-2,
dsigma_scale: float = 1.0,
):
super().__init__(problem)
[docs]
self._dsigma_scale = dsigma_scale
[docs]
self._logpi = -0.5 * self.n_data * np.log(2 * np.pi)
# Add sigma parameter, join with self.parameters, reapply transformations
[docs]
self.sigma = Parameters()
self._add_sigma_parameters(sigma0)
self.join_parameters(self.sigma)
[docs]
def _add_sigma_parameters(self, sigma0):
sigma0 = [sigma0] if not isinstance(sigma0, list) else sigma0
sigma0 = self._pad_sigma0(sigma0)
for i, value in enumerate(sigma0):
self._add_single_sigma(i, value)
[docs]
def _pad_sigma0(self, sigma0):
if len(sigma0) < self.n_outputs:
return np.pad(
sigma0,
(0, self.n_outputs - len(sigma0)),
constant_values=sigma0[-1],
)
return sigma0
[docs]
def _add_single_sigma(self, index, value):
if isinstance(value, Parameter):
self.sigma.add(value)
elif isinstance(value, (int, float)):
self.sigma.add(
Parameter(
f"Sigma for output {index+1}",
initial_value=value,
prior=Uniform(1e-8 * value, 3 * value),
bounds=[1e-8, 3 * value],
)
)
else:
raise TypeError(
f"Expected sigma0 to contain Parameter objects or numeric values. "
f"Received {type(value)}"
)
@property
[docs]
def dsigma_scale(self):
"""
Scaling factor for the dsigma term in the gradient calculation.
"""
return self._dsigma_scale
@dsigma_scale.setter
def dsigma_scale(self, new_value):
if new_value < 0:
raise ValueError("dsigma_scale must be non-negative")
self._dsigma_scale = new_value
[docs]
def compute(
self,
y: dict,
dy: np.ndarray = None,
calculate_grad: bool = False,
) -> Union[float, tuple[float, np.ndarray]]:
"""
Compute the Gaussian log-likelihood for the given parameters.
This method only computes the likelihood, without calling problem.evaluate().
Returns
-------
float
The log-likelihood value, or -inf if the standard deviations are non-positive.
"""
# Verify we have dy if calculate_grad is True
self.verify_args(dy, calculate_grad)
sigma = self.sigma.current_value()
if not self.verify_prediction(y):
return (-np.inf, -self.grad_fail) if calculate_grad else -np.inf
# Calculate residuals and error
r = np.asarray([self._target[signal] - y[signal] for signal in self.signal])
e = np.sum(
self._logpi
- self.n_data * np.log(sigma)
- np.sum(np.real(r * np.conj(r)), axis=1) / (2.0 * sigma**2.0)
)
if calculate_grad:
dl = np.sum((np.sum((r * dy.T), axis=2) / (sigma**2.0)), axis=1)
dsigma = (
-self.n_data / sigma + np.sum(r**2.0, axis=1) / (sigma**3.0)
) / self._dsigma_scale
dl = np.concatenate((dl.flatten(), dsigma))
return e, dl
return e
[docs]
class ScaledLogLikelihood(BaseMetaLikelihood):
r"""
This class scaled a `BaseLogLikelihood` class by the number of observations.
The scaling factor is given below:
.. math::
\mathcal{\hat{L(\theta)}} = \frac{1}{N} \mathcal{L(\theta)}
This class aims to provide numerical values with lower magnitude than the
canonical likelihoods, which can improve optimiser convergence in certain
cases.
"""
def __init__(self, log_likelihood: BaseLikelihood):
super().__init__(log_likelihood)
[docs]
def compute(
self,
y: dict,
dy: np.ndarray = None,
calculate_grad: bool = False,
) -> Union[float, tuple[float, np.ndarray]]:
likelihood = self._log_likelihood.compute(y, dy, calculate_grad)
normalised_val = 1 / self.n_data
if isinstance(likelihood, tuple):
return tuple(val * normalised_val for val in likelihood)
return likelihood * normalised_val
[docs]
class LogPosterior(BaseMetaLikelihood):
"""
The Log Posterior for a given problem.
Computes the log posterior which is proportional to the sum of the log
likelihood and the log prior.
Parameters
----------
log_likelihood : BaseLikelihood
The likelihood class of type ``BaseLikelihood``.
log_prior : Optional, Union[pybop.BasePrior, stats.rv_continuous]
The prior class of type ``BasePrior`` or ``stats.rv_continuous``.
If not provided, the prior class will be taken from the parameter priors
constructed in the `pybop.Parameters` class.
gradient_step : float, default: 1e-3
The step size for the finite-difference gradient calculation
if the ``log_prior`` is not of type ``BasePrior``.
"""
def __init__(
self,
log_likelihood: BaseLikelihood,
log_prior: Optional[Union[pybop.BasePrior, stats.rv_continuous]] = None,
gradient_step: float = 1e-3,
):
[docs]
self.gradient_step = gradient_step
super().__init__(log_likelihood)
if log_prior is None:
self._prior = JointLogPrior(*self.parameters.priors())
else:
self._prior = log_prior
[docs]
def compute(
self,
y: dict,
dy: np.ndarray = None,
calculate_grad: bool = False,
) -> Union[float, tuple[float, np.ndarray]]:
"""
Calculate the posterior cost for a given forward model prediction.
Parameters
----------
y : dict
The data for which to evaluate the cost.
dy : np.ndarray, optional
The correspond sensitivities in the data.
calculate_grad : bool, optional
Whether to calculate the gradient of the cost function.
Returns
-------
Union[float, Tuple[float, np.ndarray]]
The posterior cost, and optionally the gradient.
"""
# Verify we have dy if calculate_grad is True
self.verify_args(dy, calculate_grad)
if calculate_grad:
if isinstance(self._prior, BasePrior):
log_prior, dp = self._prior.logpdfS1(self.parameters.current_value())
else:
# Compute log prior first
log_prior = self._prior.logpdf(self.parameters.current_value())
# Compute a finite difference approximation of the gradient of the log prior
delta = self.parameters.initial_value() * self.gradient_step
dp = []
for parameter, step_size in zip(self.parameters, delta):
param_value = parameter.value
upper_value = param_value * (1 + step_size)
lower_value = param_value * (1 - step_size)
log_prior_upper = parameter.prior.logpdf(upper_value)
log_prior_lower = parameter.prior.logpdf(lower_value)
gradient = (log_prior_upper - log_prior_lower) / (
2 * step_size * param_value + np.finfo(float).eps
)
dp.append(gradient)
else:
log_prior = self._prior.logpdf(self.parameters.current_value())
if not np.isfinite(log_prior).any():
return (-np.inf, -self.grad_fail) if calculate_grad else -np.inf
if calculate_grad:
log_likelihood, dl = self._log_likelihood.compute(
y, dy, calculate_grad=True
)
posterior = log_likelihood + log_prior
total_gradient = dl + dp
return posterior, total_gradient
log_likelihood = self._log_likelihood.compute(y)
posterior = log_likelihood + log_prior
return posterior
@property
[docs]
def prior(self) -> BasePrior:
return self._prior