import warnings
from dataclasses import dataclass
from time import time
import numpy as np
import pints
from pints import PSO, NelderMead, PopulationBasedOptimiser
from pints import Optimiser as PintsOptimiser
from pints import RectangularBoundaries as PintsRectangularBoundaries
from pints import strfloat as PintsStrFloat
from pybop._evaluation import PopulationEvaluator, SequentialEvaluator
from pybop._logging import Logger
from pybop.optimisers._adamw import AdamWImpl
from pybop.optimisers._gradient_descent import GradientDescentImpl
from pybop.optimisers.base_optimiser import (
BaseOptimiser,
OptimisationResult,
OptimiserOptions,
)
from pybop.problems.problem import Problem
[docs]
@dataclass
class PintsOptions(OptimiserOptions):
"""
A class to hold PINTS options for the optimisation process.
Attributes
----------
default_max_iterations : int
Default maximum number of iterations (default: 1000).
max_iterations : int
Maximum number of iterations for the optimisation (default: None).
min_iterations : int
Minimum number of iterations required (default: 2).
max_unchanged_iterations : int
Maximum iterations without improvement before stopping (default: 15).
use_f_guessed : bool
Whether to use guessed function values (default: False).
absolute_tolerance : float
Absolute tolerance for convergence (default: 1e-5).
relative_tolerance : float
Relative tolerance for convergence (default: 1e-2).
max_evaluations : int | None
Maximum number of function evaluations (default: None).
threshold : float | None
Threshold value for optimisation stopping criteria (default: None).
"""
default_max_iterations = 1000
max_iterations: int = default_max_iterations
min_iterations: int = 2
max_unchanged_iterations: int = 15
use_f_guessed: bool = False
absolute_tolerance: float = 1e-5
relative_tolerance: float = 1e-2
max_evaluations: int | None = None
threshold: float | None = None
[docs]
def validate(self):
super().validate()
if self.max_iterations is not None and self.max_iterations < 0:
raise ValueError("Maximum number of iterations cannot be negative.")
if self.min_iterations is not None and self.min_iterations < 0:
raise ValueError("Minimum number of iterations cannot be negative.")
if (
self.max_unchanged_iterations is not None
and self.max_unchanged_iterations < 0
):
raise ValueError(
"Maximum number of unchanged iterations cannot be negative."
)
if self.absolute_tolerance < 0:
raise ValueError("Absolute tolerance cannot be negative.")
if self.relative_tolerance < 0:
raise ValueError("Relative tolerance cannot be negative.")
if (
self.max_iterations is None
and self.max_evaluations is None
and self.threshold is None
and self.max_unchanged_iterations is None
):
raise ValueError(
"At least one stopping criterion must be set: max_iterations, max_evaluations, threshold, or max_unchanged_iterations."
)
[docs]
class BasePintsOptimiser(BaseOptimiser):
"""
A base class for defining optimisation methods from the PINTS library.
Parameters
----------
problem: pybop.Problem
The problem to minimise.
pints_optimiser : pints.Optimiser
The PINTS optimiser class to be used.
options: PintsOptions, optional
Options for the PINTS optimiser. If None, default options are used.
"""
def __init__(
self,
problem: Problem,
pints_optimiser: pints.Optimiser,
options: PintsOptions | None = None,
):
self._pints_optimiser = pints_optimiser
options = options or self.default_options()
super().__init__(problem, options=options)
[docs]
@staticmethod
def default_options() -> PintsOptions:
"""Returns the default options for the PINTS optimiser."""
return PintsOptions()
@property
def max_iterations(self):
"""Returns the maximum number of iterations for the optimisation."""
return self._max_iterations
[docs]
def set_max_iterations(self, iterations: str | int | None = "default"):
"""
Set the maximum number of iterations as a stopping criterion.
Credit: PINTS
Parameters
----------
iterations : int, optional
The maximum number of iterations to run.
Set to `None` to remove this stopping criterion.
"""
if iterations == "default":
iterations = self.default_max_iterations
if iterations is not None:
iterations = int(iterations)
if iterations < 0:
raise ValueError("Maximum number of iterations cannot be negative.")
self._max_iterations = iterations
@property
def optimiser(self) -> pints.Optimiser:
return self._optimiser
[docs]
def _set_up_optimiser(self):
"""
Parse optimiser options and create an instance of the PINTS optimiser.
"""
# First set attributes to default values
options = self._options
self._use_f_guessed = None
self._callback = None
self._parallel = issubclass(self._pints_optimiser, PopulationBasedOptimiser)
self.set_min_iterations(options.min_iterations)
self.set_max_iterations(options.max_iterations)
self._unchanged_max_iterations = options.max_unchanged_iterations
self._absolute_tolerance = options.absolute_tolerance
self._relative_tolerance = options.relative_tolerance
self._use_f_guessed = options.use_f_guessed
self._max_evaluations = options.max_evaluations
self._threshold = options.threshold
self._boundaries = None
# Convert bounds to PINTS boundaries
bounds = self.problem.parameters.get_bounds(transformed=True)
ignored_optimisers = (GradientDescentImpl, AdamWImpl, NelderMead)
if issubclass(self._pints_optimiser, ignored_optimisers):
warnings.warn(
f"NOTE: Boundaries ignored by {self._pints_optimiser}",
stacklevel=2,
)
else:
bounds = self.problem.parameters.get_bounds(transformed=True)
if issubclass(self._pints_optimiser, PSO):
if not all(
np.isfinite(value)
for sublist in bounds.values()
for value in sublist
):
raise ValueError(
f"Either all bounds or no bounds must be set for {self._pints_optimiser.__name__}."
)
self._boundaries = PintsRectangularBoundaries(
bounds["lower"], bounds["upper"]
)
# Set the initial standard deviation / step size parameter
self._std0 = self.problem.parameters.get_std(transformed=True)
# Create an instance of the PINTS optimiser class
if issubclass(self._pints_optimiser, PintsOptimiser):
if issubclass(self._pints_optimiser, PopulationBasedOptimiser):
x0 = self.problem.parameters.get_mean(transformed=True)
else:
x0 = self.problem.parameters.get_initial_values(transformed=True)
if np.isscalar(self._std0):
param_dims = len(self.problem.parameters)
self._std0 = np.ones(param_dims) * self._std0
self._optimiser = self._pints_optimiser(
x0=x0, sigma0=self._std0, boundaries=self._boundaries
)
else:
raise ValueError("The optimiser is not a recognised PINTS optimiser class.")
# Check if sensitivities are required
self._needs_sensitivities = self._optimiser.needs_sensitivities()
# Create logger and evaluator objects
self._logger = Logger(
minimising=self.problem.minimising,
verbose=self.verbose,
verbose_print_rate=self.verbose_print_rate,
)
if self._parallel:
self._evaluator = PopulationEvaluator(
problem=self._problem,
minimise=True,
with_sensitivities=self._needs_sensitivities,
logger=self._logger,
)
else:
self._evaluator = SequentialEvaluator(
problem=self._problem,
minimise=True,
with_sensitivities=self._needs_sensitivities,
logger=self._logger,
)
@property
def evaluator(self) -> PopulationEvaluator | SequentialEvaluator:
return self._evaluator
@property
def name(self):
"""Returns the name of the PINTS optimisation strategy."""
return self._optimiser.name()
[docs]
def _run(self) -> OptimisationResult:
"""
Internal method to run the optimisation using a PINTS optimiser.
Returns
-------
result : OptimisationResult
The result of the optimisation including the optimised parameter values and cost.
See Also
--------
This method is heavily based on the run method in the PINTS.OptimisationController class.
"""
# Timing
start_time = time()
# Check stopping criteria
has_stopping_criterion = False
has_stopping_criterion |= self._max_iterations is not None
has_stopping_criterion |= self._unchanged_max_iterations is not None
has_stopping_criterion |= self._max_evaluations is not None
has_stopping_criterion |= self._threshold is not None
if not has_stopping_criterion:
raise ValueError("At least one stopping criterion must be set.")
# Set counters
self.iteration = 0
unchanged_iterations = 0
# Keep track of current best and best-guess scores.
fb = fg = np.inf
# Keep track of the last significant change
f_sig = np.inf
# Run the ask-and-tell loop
running = True
halt_message = None
try:
while running:
# Update counter
self.iteration += 1
# Ask optimiser for new points
xs = self._optimiser.ask()
# Evaluate points
fs = self._evaluator.evaluate(xs)
# Tell optimiser about function values
self._optimiser.tell(fs)
# Update the scores
fb = self._optimiser.f_best()
fg = self._optimiser.f_guessed()
# Check for significant changes against the absolute and relative tolerance
f_new = fg if self._use_f_guessed else fb
if np.abs(f_new - f_sig) >= np.maximum(
self._absolute_tolerance, self._relative_tolerance * np.abs(f_sig)
):
unchanged_iterations = 0
f_sig = f_new
else:
unchanged_iterations += 1
# Check stopping criteria:
# Maximum number of iterations
if (
self._max_iterations is not None
and self.iteration >= self._max_iterations
):
running = False
halt_message = (
"Maximum number of iterations ("
+ str(self._max_iterations)
+ ") reached."
)
# Maximum number of iterations without significant change
halt = (
self._unchanged_max_iterations is not None
and unchanged_iterations >= self._unchanged_max_iterations
and self.iteration >= self._min_iterations
)
if running and halt:
running = False
halt_message = (
"No significant change for "
+ str(unchanged_iterations)
+ " iterations."
)
# Maximum number of evaluations
if (
self._max_evaluations is not None
and self._logger.evaluations >= self._max_evaluations
):
running = False
halt_message = (
"Maximum number of evaluations ("
+ str(self._max_evaluations)
+ ") reached."
)
# Threshold value
halt = self._threshold is not None and f_new < self._threshold
if running and halt:
running = False
halt_message = (
"Objective function crossed threshold: "
+ str(self._threshold)
+ "."
)
# Error in optimiser
error = self._optimiser.stop()
if error:
running = False
halt_message = str(error)
elif self._callback is not None:
self._callback(self.iteration, self)
except (Exception, SystemExit, KeyboardInterrupt):
# Show last result and exit
print("\n" + "-" * 40)
print("Unexpected termination.")
print("Current score: " + str((fb, fg)))
print("Current position:")
# Show current parameters (with any transformation applied)
for p in self._logger.x_model_best:
print(PintsStrFloat(p))
print("-" * 40)
raise
total_time = time() - start_time
# Get best parameters
if self._use_f_guessed:
x = self._optimiser.x_guessed()
else:
x = self._optimiser.x_best()
# Log the optimised result as the final evaluation
self._evaluator.evaluate([x])
return OptimisationResult(
optim=self,
time=total_time,
method_name=self.name,
message=halt_message,
)
[docs]
def f_guessed_tracking(self):
"""
Check if f_guessed instead of f_best is being tracked.
Credit: PINTS
Returns
-------
bool
True if f_guessed is being tracked, False otherwise.
"""
return self._use_f_guessed
[docs]
def set_f_guessed_tracking(self, use_f_guessed=False):
"""
Set the method used to track the optimiser progress.
Credit: PINTS
Parameters
----------
use_f_guessed : bool, optional
If True, track f_guessed; otherwise, track f_best (default: False).
"""
self._use_f_guessed = bool(use_f_guessed)
[docs]
def set_min_iterations(self, iterations=2):
"""
Set the minimum number of iterations as a stopping criterion.
Parameters
----------
iterations : int, optional
The minimum number of iterations to run (default: 2).
Set to `None` to remove this stopping criterion.
"""
if iterations is not None:
iterations = int(iterations)
if iterations < 0:
raise ValueError("Minimum number of iterations cannot be negative.")
self._min_iterations = iterations
[docs]
def set_max_unchanged_iterations(
self, iterations=15, absolute_tolerance=1e-5, relative_tolerance=1e-2
):
"""
Set the maximum number of iterations without significant change as a stopping criterion.
Credit: PINTS
Parameters
----------
iterations : int, optional
The maximum number of unchanged iterations to run (default: 15).
Set to `None` to remove this stopping criterion.
absolute_tolerance : float, optional
The minimum significant change (absolute tolerance) in the objective function value
that resets the unchanged iteration counter (default: 1e-5).
relative_tolerance : float, optional
The minimum significant proportional change (relative tolerance) in the objective function
value that resets the unchanged iteration counter (default: 1e-2).
"""
if iterations is not None:
iterations = int(iterations)
if iterations < 0:
raise ValueError("Maximum number of iterations cannot be negative.")
absolute_tolerance = float(absolute_tolerance)
if absolute_tolerance < 0:
raise ValueError("Absolute tolerance cannot be negative.")
relative_tolerance = float(relative_tolerance)
if relative_tolerance < 0:
raise ValueError("Relative tolerance cannot be negative.")
self._unchanged_max_iterations = iterations
self._absolute_tolerance = absolute_tolerance
self._relative_tolerance = relative_tolerance
[docs]
def set_max_evaluations(self, evaluations=None):
"""
Set a maximum number of evaluations stopping criterion.
Credit: PINTS
Parameters
----------
evaluations : int, optional
The maximum number of evaluations after which to stop the optimisation
(default: None).
"""
if evaluations is not None:
evaluations = int(evaluations)
if evaluations < 0:
raise ValueError("Maximum number of evaluations cannot be negative.")
self._max_evaluations = evaluations
[docs]
def set_threshold(self, threshold=None):
"""
Adds a stopping criterion, allowing the routine to halt once the
objective function goes below a set ``threshold``.
This criterion is disabled by default, but can be enabled by calling
this method with a valid ``threshold``. To disable it, use
``set_threshold(None)``.
Credit: PINTS
Parameters
----------
threshold : float, optional
The threshold below which the objective function value is considered optimal
(default: None).
"""
if threshold is None:
self._threshold = None
else:
self._threshold = float(threshold)
[docs]
def set_population_size(self, population_size=None):
"""
Set the population size for population-based optimisers, if specified.
"""
if isinstance(self._optimiser, PopulationBasedOptimiser):
self._optimiser.set_population_size(population_size)
@property
def iteration(self):
return self._logger.iteration
@iteration.setter
def iteration(self, value):
self._logger.iteration = value