from collections.abc import Callable
from dataclasses import dataclass
from time import time
import numpy as np
from scipy.optimize import Bounds, OptimizeResult, differential_evolution, minimize
import pybop
from pybop import (
BaseOptimiser,
OptimisationResult,
PopulationEvaluator,
ScalarEvaluator,
)
from pybop._logging import Logger
from pybop.problems.problem import Problem
__all__: list[str] = [
"SciPyMinimize",
"SciPyDifferentialEvolution",
]
[docs]
class BaseSciPyOptimiser(BaseOptimiser):
"""
A base class for defining optimisation methods from the SciPy library.
Parameters
----------
problem : pybop.Problem
The problem to optimise.
options : pybop.OptimiserOptions
Valid SciPy option keys and their values.
"""
def __init__(
self,
problem: Problem,
options: pybop.OptimiserOptions | None,
):
super().__init__(problem, options=options)
[docs]
def scipy_bounds(self) -> Bounds:
bounds = self.problem.parameters.get_bounds(transformed=True)
# Convert bounds to SciPy format
if isinstance(bounds, dict):
return Bounds(bounds["lower"], bounds["upper"], True)
elif isinstance(bounds, Bounds) or bounds is None:
return bounds
else:
raise TypeError(
"Bounds provided must be either type dict or SciPy.optimize.bounds object."
)
[docs]
def _set_callback(self):
"""Add a callback to record the iteration number."""
def base_callback(intermediate_result: OptimizeResult | np.ndarray):
"""
Update the iteration number. Depending on the optimisation method, intermediate_result
may be either an OptimizeResult or an array of the current parameter values.
"""
self._logger.iteration += 1
user_callback = self._options_dict.pop("callback", None)
if user_callback is not None:
self._options_dict["callback"] = (
lambda x: user_callback(base_callback(x))
if self._options_dict.get("method", None) != "trust-constr"
else lambda x, intermediate_result: user_callback(
base_callback(intermediate_result)
)
)
else:
self._options_dict["callback"] = (
base_callback
if self._options_dict.get("method", None) != "trust-constr"
else lambda x, intermediate_result: base_callback(intermediate_result)
)
@dataclass
[docs]
class SciPyMinimizeOptions(pybop.OptimiserOptions):
"""
Options for the SciPy minimize method.
Attributes
----------
method : str, optional
The optimisation method to use (default: None).
jac : bool, optional
Method for computing the gradient vector (default: None).
tol : float, optional
Tolerance for termination (default: None).
maxiter : int, optional
Maximum number of iterations (default: 1000).
disp : bool, optional
Set to True to print convergence messages (default: False).
constraints : scipy constraint or list of scipy constraints, optional
Constraints definition. Only for COBYLA, COBYQA, SLSQP and trust-constr.
solver_options : dict, optional
A dictionary of additional solver options, not including maxiter or disp.
"""
method: str | None = None
jac: bool | None = None
tol: float | None = None
maxiter: int = 1000
disp: bool = False
constraints: list | None = None
solver_options: dict | None = None
[docs]
def validate(self):
super().validate()
if self.maxiter <= 0:
raise ValueError("maxiter must be a positive integer.")
if self.tol is not None and self.tol <= 0:
raise ValueError("tol must be a positive float.")
[docs]
def to_dict(self) -> dict:
"""Convert the options to a dictionary format."""
if self.solver_options is not None:
solver_options = self.solver_options.copy()
else:
solver_options = {}
for key in ["maxiter", "disp"]:
if getattr(self, key) is not None:
solver_options[key] = getattr(self, key)
ret = {"options": solver_options}
for key in ["method", "jac", "tol", "constraints"]:
if getattr(self, key) is not None:
ret[key] = getattr(self, key)
return ret
[docs]
class SciPyMinimize(BaseSciPyOptimiser):
"""
Adapter for various scalar minimisation algorithms implemented in SciPy, allowing fine-tuning
of the optimisation process through method selection and option configuration.
Parameters
----------
problem : pybop.Problem
The problem to optimise.
options: ScipyMinizeOptions, optional
Options for the SciPy minimize method (default: None).
See Also
--------
scipy.optimize.minimize : The SciPy method this class is based on.
Notes
-----
Different optimisation methods may support different options. Consult SciPy's
documentation for method-specific options and constraints.
"""
def __init__(
self,
problem: Problem,
options: SciPyMinimizeOptions | None = None,
):
options = options or self.default_options()
super().__init__(problem=problem, options=options)
@staticmethod
[docs]
def default_options() -> SciPyMinimizeOptions:
"""Returns the default options for the optimiser."""
return SciPyMinimizeOptions()
[docs]
def _set_up_optimiser(self):
"""
Parse optimiser options.
"""
self._options_dict = self._options.to_dict()
self._cost0 = np.abs(self.problem.get_finite_initial_cost())
self._x0 = self.problem.parameters.get_initial_values(transformed=True)
self._options_dict["x0"] = self._x0
self._options_dict["bounds"] = self.scipy_bounds()
# If the problem has sensitivities, enable the Jacobian by default
if self._options_dict.get("jac", None) is None:
self._options_dict["jac"] = self.problem.has_sensitivities
self._needs_sensitivities = True if self._options_dict["jac"] else False
# Create logger and evaluator objects
self._logger = Logger(
minimising=self.problem.minimising,
verbose=self.verbose,
verbose_print_rate=self.verbose_print_rate,
)
self._evaluator = ScalarEvaluator(
problem=self._problem,
minimise=True,
with_sensitivities=self._needs_sensitivities,
logger=self._logger,
)
self._set_callback()
[docs]
def cost_wrapper(self, x):
"""
Scale the cost function, preserving the sign, and eliminate nan values.
"""
if not self._needs_sensitivities:
cost = self._evaluator.evaluate(x)
scaled_cost = cost / self._cost0
if np.isinf(scaled_cost):
self.inf_count += 1
scaled_cost = np.sign(cost) * (
1 + 0.9**self.inf_count
) # for fake finite gradient
return scaled_cost
L, dl = self._evaluator.evaluate(x)
return (L[0] / self._cost0, dl / self._cost0)
[docs]
def _run(self):
"""
Executes the optimisation process using SciPy's minimize function.
Returns
-------
result : scipy.optimize.OptimizeResult
The result of the optimisation including the optimised parameter values and cost.
"""
start_time = time()
# Set counters
self.inf_count = 0
self._logger.iteration = 1
result: OptimizeResult = minimize(fun=self.cost_wrapper, **self._options_dict)
if hasattr(result, "nit"):
# Subtract final callback depending on method
self._logger.iteration = result.nit
total_time = time() - start_time
# Log the optimised result as the final evaluation
self._evaluator.evaluate(result.x)
return OptimisationResult(
optim=self,
logger=self._logger,
time=total_time,
optim_name=self.name,
message=result.message,
scipy_result=result,
)
@property
[docs]
def name(self):
"""Provides the name of the optimisation strategy."""
return "SciPyMinimize"
@dataclass
[docs]
class SciPyDifferentialEvolutionOptions(pybop.OptimiserOptions):
"""
Options for the SciPy differential evolution method.
Parameters
----------
strategy : str, optional
The differential evolution strategy to use. Should be one of:
- 'best1bin'
- 'best1exp'
- 'rand1exp'
- 'randtobest1exp'
- 'currenttobest1exp'
- 'best2exp'
- 'rand2exp'
- 'randtobest1bin'
- 'currenttobest1bin'
- 'best2bin'
- 'rand2bin'
- 'rand1bin'
Default is 'best1bin'.
maxiter : int, optional
Maximum number of generations (default: 1000).
popsize : int, optional
Multiplier for setting the total population size. The population has
popsize * len(x) individuals (default: 15).
tol : float, optional
Relative tolerance for convergence (default: 0.01).
mutation : float or tuple(float, float), optional
The mutation constant. If specified as a float, should be in [0, 2].
If specified as a tuple (min, max), dithering is used (default: (0.5, 1.0)).
recombination : float, optional
The recombination constant, should be in [0, 1] (default: 0.7).
seed : int, optional
Random seed for reproducibility.
disp : bool, optional
Display status messages (default: False).
callback : Callable, optional
Called after each iteration with the current result as argument.
polish : bool, optional
If True, performs a local optimisation on the solution (default: True).
init : str or array-like, optional
Specify initial population. Can be 'latinhypercube', 'random',
or an array of shape (M, len(x)).
atol : float, optional
Absolute tolerance for convergence (default: 0).
updating : {'immediate', 'deferred'}, optional
If 'immediate', best solution vector is continuously updated within
a single generation (default: 'immediate').
workers : int or map-like Callable, optional
If workers is an int the population is subdivided into workers
sections and evaluated in parallel (default: 1).
constraints : {NonlinearConstraint, LinearConstraint, Bounds}, optional
Constraints on the solver.
"""
strategy: str = "best1bin"
maxiter: int = 1000
tol: float = 0.01
popsize: int | None = None
mutation: float | tuple | None = None
recombination: float | None = None
seed: int | None = None
disp: bool | None = None
callback: Callable | None = None
polish: bool | None = None
init: str | np.ndarray | None = None
atol: float | None = None
[docs]
def to_dict(self) -> dict:
"""Convert the options to a dictionary format."""
ret = {
"strategy": self.strategy,
"maxiter": self.maxiter,
"tol": self.tol,
}
optional_keys = [
"popsize",
"mutation",
"recombination",
"seed",
"disp",
"callback",
"polish",
"init",
"atol",
]
for key in optional_keys:
if getattr(self, key) is not None:
ret[key] = getattr(self, key)
return ret
[docs]
class SciPyDifferentialEvolution(BaseSciPyOptimiser):
"""
Adapter for SciPy's differential_evolution function for global optimisation, useful for problems
involving continuous parameters and potentially multiple local minima.
Parameters
----------
problem : pybop.Problem
The problem to optimise.
options: SciPyDifferentialEvolutionOptions, optional
Options for the SciPy differential evolution method (default: None).
See Also
--------
scipy.optimize.differential_evolution : The SciPy method this class is based on.
Notes
-----
Differential Evolution is a stochastic population based method that is useful for
global optimisation problems. At each pass through the population the algorithm mutates
each candidate solution by mixing with other candidate solutions to create a trial
candidate. The fitness of all candidates is then evaluated and for each candidate if
the trial candidate is an improvement, it takes its place in the population for the next
iteration.
"""
def __init__(
self,
problem: Problem,
options: SciPyDifferentialEvolutionOptions | None = None,
):
options = options or self.default_options()
super().__init__(problem=problem, options=options)
@staticmethod
[docs]
def default_options() -> SciPyDifferentialEvolutionOptions:
"""Returns the default options for the optimiser."""
return SciPyDifferentialEvolutionOptions()
[docs]
def _set_up_optimiser(self):
"""
Parse optimiser options.
"""
self._options_dict = self._options.to_dict()
self._needs_sensitivities = False
# Check bounds
bounds = self.scipy_bounds()
if bounds is None:
raise ValueError("Bounds must be specified for differential_evolution.")
else:
if not (np.isfinite(bounds.lb).all() and np.isfinite(bounds.ub).all()):
raise ValueError("Bounds must be finite for differential_evolution.")
self._options_dict["bounds"] = bounds
# Create logger and evaluator objects
self._logger = Logger(
minimising=self.problem.minimising,
verbose=self.verbose,
verbose_print_rate=self.verbose_print_rate,
)
self._evaluator = ScalarEvaluator(
problem=self._problem,
minimise=True,
with_sensitivities=self._needs_sensitivities,
logger=self._logger,
)
self._set_callback()
# Enable vectorisation. Differential evolution proposes candidates as an
# array of size (N, S) amd expects to receive a set of costs of size (S,)
self._options_dict["updating"] = "deferred"
pop_evaluator = PopulationEvaluator(
problem=self._problem,
minimise=True,
with_sensitivities=self._needs_sensitivities,
logger=self._logger,
)
def map_function(func, positions):
# Use the PopulationEvaluator instead of the ScalarEvaluator for multiprocessing
return pop_evaluator.evaluate(positions)
self._options_dict["workers"] = map_function
[docs]
def _run(self):
"""
Executes the optimisation process using SciPy's differential_evolution function.
Returns
-------
result : scipy.optimize.OptimizeResult
The result of the optimisation including the optimised parameter values and cost.
"""
start_time = time()
# Set counters
self._logger.iteration = 1
result = differential_evolution(
func=self._evaluator.evaluate, **self._options_dict
)
self._logger.iteration -= 1 # undo the final callback
total_time = time() - start_time
# Log the optimised result as the final evaluation
self._evaluator.evaluate(result.x)
return OptimisationResult(
optim=self,
logger=self._logger,
time=total_time,
optim_name=self.name,
message=result.message,
scipy_result=result,
)
@property
[docs]
def name(self):
"""Provides the name of the optimisation strategy."""
return "SciPyDifferentialEvolution"