Source code for pybop.costs._likelihoods

from typing import Optional, Union

import numpy as np
import scipy.stats as stats

import pybop
from pybop.costs.base_cost import BaseCost
from pybop.parameters.parameter import Inputs, Parameter, Parameters
from pybop.parameters.priors import BasePrior, JointLogPrior, Uniform
from pybop.problems.base_problem import BaseProblem


[docs] class BaseLikelihood(BaseCost): """ Base class for likelihoods. """ def __init__(self, problem: BaseProblem): super().__init__(problem)
[docs] self.n_data = problem.n_data
[docs] self.minimising = False
[docs] def observed_fisher( self, inputs: Union[Inputs, list, np.ndarray] ) -> Union[np.ndarray, None]: """ Compute the observed Fisher Information Matrix (FIM) for the given data. The Fisher information is computed as the outer product of the gradients with respect to the model parameters, scaled by the inverse of the dataset size. This method should only be used with exponential-based likelihood functions. Parameters ---------- inputs : Union[dict[str, float], list-like] Input data for model evaluation. Returns ------- np.ndarray The observed Fisher Information Matrix. """ # Check gradients are available, return None if not. if not self.problem.sensitivities_available: return # Calculate the fisher information via gradient outer-product _, grad = self.__call__(inputs, calculate_grad=True) shaped_grad = grad.reshape(-1, 1) fisher_info = (shaped_grad @ shaped_grad.T) / self.n_data return fisher_info
[docs] class BaseMetaLikelihood(BaseLikelihood): """ Base class for likelihood classes which have a meta-likelihood such as `LogPosterior` or `ScaledLoglikelihood`. This class points the required attributes towards the composed likelihood class. """ def __init__(self, log_likelihood: BaseLikelihood):
[docs] self._log_likelihood = log_likelihood
super().__init__(log_likelihood.problem) @property
[docs] def transformation(self): return self._log_likelihood.transformation
@property
[docs] def has_separable_problem(self): return self._log_likelihood.has_separable_problem
@property
[docs] def parameters(self): return self._log_likelihood.parameters
@property
[docs] def n_parameters(self): return self._log_likelihood.n_parameters
@property
[docs] def likelihood(self) -> BaseLikelihood: return self._log_likelihood
[docs] class GaussianLogLikelihoodKnownSigma(BaseLikelihood): """ This class represents a Gaussian Log Likelihood with a known sigma, which assumes that the data follows a Gaussian distribution and computes the log-likelihood of observed data under this assumption. Parameters ---------- sigma0 : scalar or array Initial standard deviation around ``x0``. Either a scalar value (one standard deviation for all coordinates) or an array with one entry per dimension. """ def __init__(self, problem: BaseProblem, sigma0: Union[list[float], float]): super().__init__(problem) sigma0 = self.check_sigma0(sigma0)
[docs] self.sigma2 = sigma0**2.0
[docs] self._offset = -0.5 * self.n_data * np.log(2 * np.pi * self.sigma2)
[docs] self._multip = -1 / (2.0 * self.sigma2)
[docs] def compute( self, y: dict, dy: Optional[np.ndarray] = None, ) -> Union[float, tuple[float, np.ndarray]]: """ Compute the Gaussian log-likelihood for the given parameters with known sigma. This method only computes the likelihood, without calling the problem.evaluateS1. """ # Early return if the prediction is not verified if not self.verify_prediction(y): return (-np.inf, -self.grad_fail) if dy is not None else -np.inf # Calculate residuals and error r = np.asarray([self._target[signal] - y[signal] for signal in self.signal]) e = np.sum(self._offset + self._multip * np.sum(np.real(r * np.conj(r)))) if dy is not None: dl = np.sum((np.sum((r * dy.T), axis=2) / self.sigma2), axis=1) return e, dl return e
[docs] def check_sigma0(self, sigma0: Union[np.ndarray, float]): """ Check the validity of sigma0. """ sigma0 = np.asarray(sigma0, dtype=float) if not np.all(sigma0 > 0): raise ValueError("Sigma0 must be positive") if np.shape(sigma0) not in [(), (1,), (self.n_outputs,)]: raise ValueError( "sigma0 must be either a scalar value (one standard deviation for " "all coordinates) or an array with one entry per dimension." ) return sigma0
[docs] class GaussianLogLikelihood(BaseLikelihood): """ This class represents a Gaussian Log Likelihood, which assumes that the data follows a Gaussian distribution and computes the log-likelihood of observed data under this assumption. This class estimates the standard deviation of the Gaussian distribution alongside the parameters of the model. Attributes ---------- _logpi : float Precomputed offset value for the log-likelihood function. _dsigma_scale : float Scale factor for derivative of standard deviation. """ def __init__( self, problem: BaseProblem, sigma0: Union[float, list[float], list[Parameter]] = 1e-2, dsigma_scale: float = 1.0, ): super().__init__(problem)
[docs] self._dsigma_scale = dsigma_scale
[docs] self._logpi = -0.5 * self.n_data * np.log(2 * np.pi)
# Add sigma parameter, join with self.parameters, reapply transformations
[docs] self.sigma = Parameters()
self._add_sigma_parameters(sigma0) self.join_parameters(self.sigma)
[docs] self.transformation = self._parameters.construct_transformation()
[docs] def _add_sigma_parameters(self, sigma0): sigma0 = [sigma0] if not isinstance(sigma0, list) else sigma0 sigma0 = self._pad_sigma0(sigma0) for i, value in enumerate(sigma0): self._add_single_sigma(i, value)
[docs] def _pad_sigma0(self, sigma0): if len(sigma0) < self.n_outputs: return np.pad( sigma0, (0, self.n_outputs - len(sigma0)), constant_values=sigma0[-1], ) return sigma0
[docs] def _add_single_sigma(self, index, value): if isinstance(value, Parameter): self.sigma.add(value) elif isinstance(value, (int, float)): self.sigma.add( Parameter( f"Sigma for output {index + 1}", initial_value=value, prior=Uniform(1e-8 * value, 3 * value), bounds=[1e-8, 3 * value], ) ) else: raise TypeError( f"Expected sigma0 to contain Parameter objects or numeric values. " f"Received {type(value)}" )
@property
[docs] def dsigma_scale(self): """ Scaling factor for the dsigma term in the gradient calculation. """ return self._dsigma_scale
@dsigma_scale.setter def dsigma_scale(self, new_value): if new_value < 0: raise ValueError("dsigma_scale must be non-negative") self._dsigma_scale = new_value
[docs] def compute( self, y: dict, dy: Optional[np.ndarray] = None, ) -> Union[float, tuple[float, np.ndarray]]: """ Compute the Gaussian log-likelihood for the given parameters. This method only computes the likelihood, without calling problem.evaluate(). Returns ------- float The log-likelihood value, or -inf if the standard deviations are non-positive. """ sigma = self.sigma.current_value() if not self.verify_prediction(y): return (-np.inf, -self.grad_fail) if dy is not None else -np.inf # Calculate residuals and error r = np.asarray([self._target[signal] - y[signal] for signal in self.signal]) e = np.sum( self._logpi - self.n_data * np.log(sigma) - np.sum(np.real(r * np.conj(r)), axis=1) / (2.0 * sigma**2.0) ) if dy is not None: dl = np.sum((np.sum((r * dy.T), axis=2) / (sigma**2.0)), axis=1) dsigma = ( -self.n_data / sigma + np.sum(r**2.0, axis=1) / (sigma**3.0) ) / self._dsigma_scale dl = np.concatenate((dl.flatten(), dsigma)) return e, dl return e
[docs] class ScaledLogLikelihood(BaseMetaLikelihood): r""" This class scaled a `BaseLogLikelihood` class by the number of observations. The scaling factor is given below: .. math:: \mathcal{\hat{L(\theta)}} = \frac{1}{N} \mathcal{L(\theta)} This class aims to provide numerical values with lower magnitude than the canonical likelihoods, which can improve optimiser convergence in certain cases. """
[docs] def compute( self, y: dict, dy: Optional[np.ndarray] = None, ) -> Union[float, tuple[float, np.ndarray]]: likelihood = self._log_likelihood.compute(y, dy) normalised_val = 1 / self.n_data if isinstance(likelihood, tuple): return tuple(val * normalised_val for val in likelihood) return likelihood * normalised_val
[docs] class LogPosterior(BaseMetaLikelihood): """ The Log Posterior for a given problem. Computes the log posterior which is proportional to the sum of the log likelihood and the log prior. Parameters ---------- log_likelihood : BaseLikelihood The likelihood class of type ``BaseLikelihood``. log_prior : Optional, Union[pybop.BasePrior, stats.rv_continuous] The prior class of type ``BasePrior`` or ``stats.rv_continuous``. If not provided, the prior class will be taken from the parameter priors constructed in the `pybop.Parameters` class. gradient_step : float, default: 1e-3 The step size for the finite-difference gradient calculation if the ``log_prior`` is not of type ``BasePrior``. """ def __init__( self, log_likelihood: BaseLikelihood, log_prior: Optional[Union[pybop.BasePrior, stats.rv_continuous]] = None, gradient_step: float = 1e-3, ):
[docs] self.gradient_step = gradient_step
super().__init__(log_likelihood) if log_prior is None: self._prior = JointLogPrior(*self.parameters.priors()) else: self._prior = log_prior
[docs] def compute( self, y: dict, dy: Optional[np.ndarray] = None, ) -> Union[float, tuple[float, np.ndarray]]: """ Calculate the posterior cost for a given forward model prediction. Parameters ---------- y : dict The data for which to evaluate the cost. dy : np.ndarray, optional The correspond sensitivities in the data. Returns ------- Union[float, Tuple[float, np.ndarray]] The posterior cost, and optionally the gradient. """ if dy is not None: if isinstance(self._prior, BasePrior): log_prior, dp = self._prior.logpdfS1(self.parameters.current_value()) else: # Compute log prior first log_prior = self._prior.logpdf(self.parameters.current_value()) # Compute a finite difference approximation of the gradient of the log prior delta = self.parameters.initial_value() * self.gradient_step dp = [] for parameter, step_size in zip(self.parameters, delta): param_value = parameter.value upper_value = param_value * (1 + step_size) lower_value = param_value * (1 - step_size) log_prior_upper = parameter.prior.logpdf(upper_value) log_prior_lower = parameter.prior.logpdf(lower_value) gradient = (log_prior_upper - log_prior_lower) / ( 2 * step_size * param_value + np.finfo(float).eps ) dp.append(gradient) else: log_prior = self._prior.logpdf(self.parameters.current_value()) if not np.isfinite(log_prior).any(): return (-np.inf, -self.grad_fail) if dy is not None else -np.inf if dy is not None: log_likelihood, dl = self._log_likelihood.compute(y, dy) posterior = log_likelihood + log_prior total_gradient = dl + dp return posterior, total_gradient log_likelihood = self._log_likelihood.compute(y) posterior = log_likelihood + log_prior return posterior
@property
[docs] def prior(self) -> BasePrior: return self._prior