Source code for pybop.costs._likelihoods

from typing import List, Tuple, Union

import numpy as np

from pybop.costs.base_cost import BaseCost
from pybop.parameters.parameter import Inputs, Parameter, Parameters
from pybop.parameters.priors import Uniform
from pybop.problems.base_problem import BaseProblem


[docs] class BaseLikelihood(BaseCost): """ Base class for likelihoods """ def __init__(self, problem: BaseProblem): super(BaseLikelihood, self).__init__(problem)
[docs] self.n_time_data = problem.n_time_data
[docs] class GaussianLogLikelihoodKnownSigma(BaseLikelihood): """ This class represents a Gaussian Log Likelihood with a known sigma, which assumes that the data follows a Gaussian distribution and computes the log-likelihood of observed data under this assumption. Parameters ---------- sigma0 : scalar or array Initial standard deviation around ``x0``. Either a scalar value (one standard deviation for all coordinates) or an array with one entry per dimension. """ def __init__(self, problem: BaseProblem, sigma0: Union[List[float], float]): super(GaussianLogLikelihoodKnownSigma, self).__init__(problem)
[docs] sigma0 = self.check_sigma0(sigma0)
[docs] self.sigma2 = sigma0**2.0
[docs] self._offset = -0.5 * self.n_time_data * np.log(2 * np.pi * self.sigma2)
[docs] self._multip = -1 / (2.0 * self.sigma2)
[docs] self._dl = np.ones(self.n_parameters)
[docs] def _evaluate(self, inputs: Inputs, grad: Union[None, np.ndarray] = None) -> float: """ Evaluates the Gaussian log-likelihood for the given parameters with known sigma. """ y = self.problem.evaluate(inputs) if any( len(y.get(key, [])) != len(self._target.get(key, [])) for key in self.signal ): return -np.inf # prediction length doesn't match target e = np.sum( [ np.sum( self._offset + self._multip * np.sum((self._target[signal] - y[signal]) ** 2.0) ) for signal in self.signal ] ) return e if self.n_outputs != 1 else e.item()
[docs] def _evaluateS1(self, inputs: Inputs) -> Tuple[float, np.ndarray]: """ Calls the problem.evaluateS1 method and calculates the log-likelihood and gradient. """ y, dy = self.problem.evaluateS1(inputs) if any( len(y.get(key, [])) != len(self._target.get(key, [])) for key in self.signal ): return -np.inf, -self._dl likelihood = self._evaluate(inputs) r = np.asarray([self._target[signal] - y[signal] for signal in self.signal]) dl = np.sum((np.sum((r * dy.T), axis=2) / self.sigma2), axis=1) return likelihood, dl
[docs] def check_sigma0(self, sigma0: Union[np.ndarray, float]): """ Check the validity of sigma0. """ sigma0 = np.asarray(sigma0, dtype=float) if not np.all(sigma0 > 0): raise ValueError("Sigma0 must be positive") if np.shape(sigma0) not in [(), (1,), (self.n_outputs,)]: raise ValueError( "sigma0 must be either a scalar value (one standard deviation for " + "all coordinates) or an array with one entry per dimension." ) return sigma0
[docs] class GaussianLogLikelihood(BaseLikelihood): """ This class represents a Gaussian Log Likelihood, which assumes that the data follows a Gaussian distribution and computes the log-likelihood of observed data under this assumption. This class estimates the standard deviation of the Gaussian distribution alongside the parameters of the model. Attributes ---------- _logpi : float Precomputed offset value for the log-likelihood function. _dsigma_scale : float Scale factor for derivative of standard deviation. """ def __init__( self, problem: BaseProblem, sigma0: Union[float, List[float], List[Parameter]] = 0.002, dsigma_scale: float = 1.0, ): super(GaussianLogLikelihood, self).__init__(problem)
[docs] self._dsigma_scale = dsigma_scale
[docs] self._logpi = -0.5 * self.n_time_data * np.log(2 * np.pi)
[docs] self.sigma = Parameters()
self._add_sigma_parameters(sigma0) self.parameters.join(self.sigma)
[docs] self._dl = np.ones(self.n_parameters)
[docs] def _add_sigma_parameters(self, sigma0): sigma0 = [sigma0] if not isinstance(sigma0, List) else sigma0 sigma0 = self._pad_sigma0(sigma0) for i, value in enumerate(sigma0): self._add_single_sigma(i, value)
[docs] def _pad_sigma0(self, sigma0): if len(sigma0) < self.n_outputs: return np.pad( sigma0, (0, self.n_outputs - len(sigma0)), constant_values=sigma0[-1], ) return sigma0
[docs] def _add_single_sigma(self, index, value): if isinstance(value, Parameter): self.sigma.add(value) elif isinstance(value, (int, float)): self.sigma.add( Parameter( f"Sigma for output {index+1}", initial_value=value, prior=Uniform(0.5 * value, 1.5 * value), ) ) else: raise TypeError( f"Expected sigma0 to contain Parameter objects or numeric values. " f"Received {type(value)}" )
@property
[docs] def dsigma_scale(self): """ Scaling factor for the dsigma term in the gradient calculation. """ return self._dsigma_scale
@dsigma_scale.setter def dsigma_scale(self, new_value): if new_value < 0: raise ValueError("dsigma_scale must be non-negative") self._dsigma_scale = new_value
[docs] def _evaluate(self, inputs: Inputs, grad: Union[None, np.ndarray] = None) -> float: """ Evaluates the Gaussian log-likelihood for the given parameters. Parameters ---------- inputs : Inputs The parameters for which to evaluate the log-likelihood, including the `n_outputs` standard deviations of the Gaussian distributions. Returns ------- float The log-likelihood value, or -inf if the standard deviations are non-positive. """ self.parameters.update(values=list(inputs.values())) sigma = self.sigma.current_value() if np.any(sigma <= 0): return -np.inf y = self.problem.evaluate(self.problem.parameters.as_dict()) if any( len(y.get(key, [])) != len(self._target.get(key, [])) for key in self.signal ): return -np.inf # prediction length doesn't match target e = np.sum( [ np.sum( self._logpi - self.n_time_data * np.log(sigma) - np.sum((self._target[signal] - y[signal]) ** 2.0) / (2.0 * sigma**2.0) ) for signal in self.signal ] ) return e if self.n_outputs != 1 else e.item()
[docs] def _evaluateS1(self, inputs: Inputs) -> Tuple[float, np.ndarray]: """ Calls the problem.evaluateS1 method and calculates the log-likelihood. Parameters ---------- inputs : Inputs The parameters for which to evaluate the log-likelihood. Returns ------- Tuple[float, np.ndarray] The log-likelihood and its gradient. """ self.parameters.update(values=list(inputs.values())) sigma = self.sigma.current_value() if np.any(sigma <= 0): return -np.inf, -self._dl y, dy = self.problem.evaluateS1(self.problem.parameters.as_dict()) if any( len(y.get(key, [])) != len(self._target.get(key, [])) for key in self.signal ): return -np.inf, -self._dl likelihood = self._evaluate(inputs) r = np.asarray([self._target[signal] - y[signal] for signal in self.signal]) dl = np.sum((np.sum((r * dy.T), axis=2) / (sigma**2.0)), axis=1) dsigma = ( -self.n_time_data / sigma + np.sum(r**2.0, axis=1) / (sigma**3.0) ) / self._dsigma_scale dl = np.concatenate((dl.flatten(), dsigma)) return likelihood, dl
[docs] class MAP(BaseLikelihood): """ Maximum a posteriori cost function. Computes the maximum a posteriori cost function, which is the sum of the log likelihood and the log prior. The goal of maximising is achieved by setting minimising = False in the optimiser settings. Inherits all parameters and attributes from ``BaseLikelihood``. """ def __init__(self, problem, likelihood, sigma0=None, gradient_step=1e-3): super(MAP, self).__init__(problem)
[docs] self.sigma0 = sigma0
[docs] self.gradient_step = gradient_step
if self.sigma0 is None: self.sigma0 = [] for param in self.problem.parameters: self.sigma0.append(param.prior.sigma) try: self.likelihood = likelihood(problem=self.problem, sigma0=self.sigma0) except Exception as e: raise ValueError( f"An error occurred when constructing the Likelihood class: {e}" ) if hasattr(self, "likelihood") and not isinstance( self.likelihood, BaseLikelihood ): raise ValueError(f"{self.likelihood} must be a subclass of BaseLikelihood")
[docs] def _evaluate(self, inputs: Inputs, grad=None) -> float: """ Calculate the maximum a posteriori cost for a given set of parameters. Parameters ---------- inputs : Inputs The parameters for which to evaluate the cost. grad : array-like, optional An array to store the gradient of the cost function with respect to the parameters. Returns ------- float The maximum a posteriori cost. """ log_likelihood = self.likelihood._evaluate(inputs) log_prior = sum( self.parameters[key].prior.logpdf(value) for key, value in inputs.items() ) posterior = log_likelihood + log_prior return posterior
[docs] def _evaluateS1(self, inputs: Inputs) -> Tuple[float, np.ndarray]: """ Compute the maximum a posteriori with respect to the parameters. The method passes the likelihood gradient to the optimiser without modification. Parameters ---------- inputs : Inputs The parameters for which to compute the cost and gradient. Returns ------- tuple A tuple containing the cost and the gradient. The cost is a float, and the gradient is an array-like of the same length as `x`. Raises ------ ValueError If an error occurs during the calculation of the cost or gradient. """ log_likelihood, dl = self.likelihood._evaluateS1(inputs) log_prior = sum( self.parameters[key].prior.logpdf(value) for key, value in inputs.items() ) # Compute a finite difference approximation of the gradient of the log prior delta = self.parameters.initial_value() * self.gradient_step prior_gradient = [] for parameter, step_size in zip(self.problem.parameters, delta): param_value = inputs[parameter.name] log_prior_upper = parameter.prior.logpdf(param_value * (1 + step_size)) log_prior_lower = parameter.prior.logpdf(param_value * (1 - step_size)) gradient = (log_prior_upper - log_prior_lower) / ( 2 * step_size * param_value + np.finfo(float).eps ) prior_gradient.append(gradient) posterior = log_likelihood + log_prior total_gradient = dl + prior_gradient return posterior, total_gradient