import copy
import logging
import math
import os
import sys
from collections.abc import Iterable
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from textwrap import indent
from typing import Union
import dill
import gpyreg as gpr
import matplotlib.pyplot as plt
import numpy as np
from pyvbmc.formatting import full_repr, summarize
from pyvbmc.function_logger import FunctionLogger
from pyvbmc.parameter_transformer import ParameterTransformer
from pyvbmc.priors import Prior, SciPy, convert_to_prior
from pyvbmc.stats import kl_div_mvn
from pyvbmc.timer import main_timer as timer
from pyvbmc.variational_posterior import VariationalPosterior
from pyvbmc.whitening import warp_gp_and_vp, warp_input
from .active_sample import active_sample
from .gaussian_process_train import reupdate_gp, train_gp
from .iteration_history import IterationHistory
from .options import Options
from .variational_optimization import optimize_vp, update_K
[docs]
class VBMC:
"""
Posterior and model inference via Variational Bayesian Monte Carlo (VBMC).
VBMC computes a variational approximation of the full posterior and a lower
bound on the normalization constant (marginal likelhood or model evidence)
for a provided unnormalized log posterior.
Initialize a ``VBMC`` object to set up the inference problem, then run
``optimize()``. See the examples for more details.
Parameters
----------
log_density : callable
A given target log-posterior or log-likelihood. If ``log_prior`` is
``None``, ``log_density`` accepts input ``x`` and returns the value of
the target log-joint, that is, the unnormalized log-posterior density
at ``x``. If ``log_prior`` is not ``None``, ``log_density`` should
return the unnormalized log-likelihood. In either case, if
``options["specify_target_noise"]`` is true, ``log_density`` should
return a tuple where the first element is the noisy log-density, and
the second is an estimate of the standard deviation of the noise.
x0 : np.ndarray, optional
Starting point for the inference. Ideally ``x0`` is a point in the
proximity of the mode of the posterior. Default is ``None``.
lower_bounds, upper_bounds : np.ndarray, optional
``lower_bounds`` (`LB`) and ``upper_bounds`` (`UB`) define a set
of strict lower and upper bounds for the coordinate vector, `x`, so
that the posterior has support on `LB` < `x` < `UB`.
If scalars, the bound is replicated in each dimension. Use
``None`` for `LB` and `UB` if no bounds exist. Set `LB` [`i`] = -``inf``
and `UB` [`i`] = ``inf`` if the `i`-th coordinate is unbounded (while
other coordinates may be bounded). Note that if `LB` and `UB` contain
unbounded variables, the respective values of `PLB` and `PUB` need to
be specified (see below), by default ``None``.
plausible_lower_bounds, plausible_upper_bounds : np.ndarray, optional
Specifies a set of ``plausible_lower_bounds`` (`PLB`) and
``plausible_upper_bounds`` (`PUB`) such that `LB` < `PLB` < `PUB` < `UB`.
Both `PLB` and `PUB` need to be finite. `PLB` and `PUB` represent a
"plausible" range, which should denote a region of high posterior
probability mass. Among other things, the plausible box is used to
draw initial samples and to set priors over hyperparameters of the
algorithm. When in doubt, we found that setting `PLB` and `PUB` using
the topmost ~68% percentile range of the prior (e.g, mean +/- 1 SD
for a Gaussian prior) works well in many cases (but note that
additional information might afford a better guess). Both are
by default `None`.
options : dict, optional
Additional options can be passed as a dict. Please refer to the
VBMC options page for the default options. If no ``options`` are
passed, the default options are used.
options_path : pathlike, optional
Additional options can also be specified by a `.ini` file. The
``options`` `dict` keyword takes precedence over options set in the
`.ini` file. ``options_path`` should be either absolute or relative to
the `pyvbmc/vbmc/` directory.
prior, optional
An optional separate prior. It can be a ``pyvbmc.priors.Prior``
subclass, an appropriate ``scipy.stats`` distribution, or a list of
one-dimensional PyVBMC ``Prior`` and/or one-dimenional continuous
``scipy.stats`` distributions. (see the documentation on priors for
more details). If ``prior`` is not `None`, the argument ``log_density``
is assumed to represent the log-likelihood (otherwise it is assumed to
represent the log-joint).
log_prior : callable, optional
An optional separate log-prior function, which should accept a single
argument `x` and return the log-density of the prior at `x`. If
``log_prior`` is not ``None``, the argument ``log_density`` is assumed
the log-joint).
sample_prior : callable, optional
An optional function which accepts a single argument `n` and returns an
array of samples from the prior, of shape `(n, D)`, where `D` is the
problem dimension. Currently unused.
Raises
------
ValueError
When neither `x0` or (`plausible_lower_bounds` and
`plausible_upper_bounds`) are specified.
ValueError
When various checks for the bounds (LB, UB, PLB, PUB) of VBMC fail.
References
----------
.. [1] Huggins, B., Li, C., Tobaben, M., Aarnos, M., & Acerbi, L. (2023).
"PyVBMC: Efficient Bayesian inference in Python". Journal of Open
Source Software 8(86), 5428, https://doi.org/10.21105/joss.05428.
.. [2] Acerbi, L. (2018). "Variational Bayesian Monte Carlo". In Advances
in Neural Information Processing Systems 31 (NeurIPS 2018), pp. 8213-8223.
.. [3] Acerbi, L. (2020). "Variational Bayesian Monte Carlo with Noisy
Likelihoods". In Advances in Neural Information Processing Systems 33
(NeurIPS 2020).
Examples
--------
For `VBMC` usage examples, please look up the Jupyter notebook tutorials
in the PyVBMC documentation:
https://acerbilab.github.io/pyvbmc/_examples/pyvbmc_example_1.html
"""
def __init__(
self,
log_density: callable,
x0: np.ndarray = None,
lower_bounds: np.ndarray = None,
upper_bounds: np.ndarray = None,
plausible_lower_bounds: np.ndarray = None,
plausible_upper_bounds: np.ndarray = None,
options: dict = None,
options_path: Union[os.PathLike, None] = None,
prior: Prior = None,
log_prior: callable = None,
sample_prior: callable = None,
):
# set up root logger (only changes stuff if not initialized yet)
logging.basicConfig(stream=sys.stdout, format="%(message)s")
# Initialize variables and algorithm structures
if x0 is None:
if (
plausible_lower_bounds is None
or plausible_upper_bounds is None
):
raise ValueError(
"""vbmc:UnknownDims If no starting point is
provided, PLB and PUB need to be specified."""
)
else:
x0 = np.full((plausible_lower_bounds.shape), np.NaN)
if x0.ndim == 1:
logging.warning("Reshaping x0 to row vector.")
x0 = x0.reshape((1, -1))
self.D = x0.shape[1]
# load basic and advanced options and validate the names
basic_options_path = "option_configs/basic_vbmc_options.ini"
self.options = Options(
basic_options_path,
evaluation_parameters={"D": self.D},
user_options=options,
)
advanced_options_path = "option_configs/advanced_vbmc_options.ini"
self.options.load_options_file(
advanced_options_path,
evaluation_parameters={"D": self.D},
)
self.options.update_defaults()
if options_path is not None:
# Update with user-specified config file
self.options.load_options_file(
options_path,
evaluation_parameters={"D": self.D},
)
self.options.validate_option_names(
[basic_options_path, advanced_options_path, options_path]
)
else:
self.options.validate_option_names(
[basic_options_path, advanced_options_path]
)
# Create an initial logger for initialization messages:
self.logger = self._init_logger("_init")
# variable to keep track of logging actions
self.logging_action = []
# Empty LB and UB are Infs
if lower_bounds is None:
lower_bounds = np.ones((1, self.D)) * -np.inf
if upper_bounds is None:
upper_bounds = np.ones((1, self.D)) * np.inf
# Check/fix boundaries and starting points
(
self.x0,
self.lower_bounds,
self.upper_bounds,
self.plausible_lower_bounds,
self.plausible_upper_bounds,
) = self._bounds_check(
x0,
lower_bounds,
upper_bounds,
plausible_lower_bounds,
plausible_upper_bounds,
)
# starting point
if not np.all(np.isfinite(self.x0)):
# print('Initial starting point is invalid or not provided.
# Starting from center of plausible region.\n');
self.x0 = 0.5 * (
self.plausible_lower_bounds + self.plausible_upper_bounds
)
# Initialize transformation to unbounded parameters
self.parameter_transformer = ParameterTransformer(
self.D,
self.lower_bounds,
self.upper_bounds,
self.plausible_lower_bounds,
self.plausible_upper_bounds,
transform_type=self.options["bounded_transform"],
)
# Initialize variational posterior
self.vp = VariationalPosterior(
D=self.D,
K=self.options.get("k_warmup"),
x0=self.x0,
parameter_transformer=self.parameter_transformer,
)
if not self.options.get("warmup"):
self.vp.optimize_mu = self.options.get("variable_means")
self.vp.optimize_weights = self.options.get("variable_weights")
# The underlying Gaussian process which corresponds to current vp
self.gp = None
self.hyp_dict = (
{}
) # For storing auxilary info related to gp hyperparameters
# Optimization of vbmc starts from iteration 0
self.iteration = -1
# Whether the optimization has finished
self.is_finished = False
self.optim_state = self._init_optim_state()
(
self.log_joint,
self.log_likelihood,
self.prior,
) = self._init_log_joint(log_density, prior, log_prior, sample_prior)
self.function_logger = FunctionLogger(
fun=self.log_joint,
D=self.D,
noise_flag=self.optim_state.get("uncertainty_handling_level") > 0,
uncertainty_handling_level=self.optim_state.get(
"uncertainty_handling_level"
),
cache_size=self.options.get("cache_size"),
parameter_transformer=self.parameter_transformer,
)
self.x0 = self.parameter_transformer(self.x0)
self.random_state = np.random.get_state()
self.iteration_history = IterationHistory(
[
"r_index",
"elcbo_impro",
"stable",
"elbo",
"vp",
"warmup",
"iter",
"elbo_sd",
"lcb_max",
"data_trim_list",
"gp",
"gp_hyp_full",
"Ns_gp",
"timer",
"optim_state",
"sKL",
"sKL_true",
"pruned",
"var_ss",
"func_count",
"n_eff",
"logging_action",
# For resuming optimization from a specific iteration, mostly
# useful for debugging
"function_logger",
"random_state",
]
)
def _bounds_check(
self,
x0: np.ndarray,
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
plausible_lower_bounds: np.ndarray = None,
plausible_upper_bounds: np.ndarray = None,
):
"""
Private function to do the initial check of the VBMC bounds.
"""
N0, D = x0.shape
if plausible_lower_bounds is None or plausible_upper_bounds is None:
if N0 > 1:
self.logger.warning(
"PLB and/or PUB not specified. Estimating"
"plausible bounds from starting set X0..."
)
width = x0.max(0) - x0.min(0)
if plausible_lower_bounds is None:
plausible_lower_bounds = x0.min(0) - width / N0
plausible_lower_bounds = np.maximum(
plausible_lower_bounds, lower_bounds
)
if plausible_upper_bounds is None:
plausible_upper_bounds = x0.max(0) + width / N0
plausible_upper_bounds = np.minimum(
plausible_upper_bounds, upper_bounds
)
idx = plausible_lower_bounds == plausible_upper_bounds
if np.any(idx):
plausible_lower_bounds[idx] = lower_bounds[idx]
plausible_upper_bounds[idx] = upper_bounds[idx]
self.logger.warning(
"vbmc:pbInitFailed: Some plausible bounds could not be "
"determined from starting set. Using hard upper/lower"
" bounds for those instead."
)
else:
self.logger.warning(
"vbmc:pbUnspecified: Plausible lower/upper bounds PLB and"
"/or PUB not specified and X0 is not a valid starting set. "
"Using hard upper/lower bounds instead."
)
if plausible_lower_bounds is None:
plausible_lower_bounds = np.copy(lower_bounds)
if plausible_upper_bounds is None:
plausible_upper_bounds = np.copy(upper_bounds)
# Try to reshape bounds to row vectors
lower_bounds = np.atleast_1d(lower_bounds)
upper_bounds = np.atleast_1d(upper_bounds)
plausible_lower_bounds = np.atleast_1d(plausible_lower_bounds)
plausible_upper_bounds = np.atleast_1d(plausible_upper_bounds)
try:
if lower_bounds.shape != (1, D):
logging.warning("Reshaping lower bounds to (1, %d).", D)
lower_bounds = lower_bounds.reshape((1, D))
if upper_bounds.shape != (1, D):
logging.warning("Reshaping upper bounds to (1, %d).", D)
upper_bounds = upper_bounds.reshape((1, D))
if plausible_lower_bounds.shape != (1, D):
logging.warning(
"Reshaping plausible lower bounds to (1, %d).", D
)
plausible_lower_bounds = plausible_lower_bounds.reshape((1, D))
if plausible_upper_bounds.shape != (1, D):
logging.warning(
"Reshaping plausible upper bounds to (1, %d).", D
)
plausible_upper_bounds = plausible_upper_bounds.reshape((1, D))
except ValueError as exc:
raise ValueError(
"Bounds must match problem dimension D=%d.", D
) from exc
# check that plausible bounds are finite
if np.any(np.invert(np.isfinite(plausible_lower_bounds))) or np.any(
np.invert(np.isfinite(plausible_upper_bounds))
):
raise ValueError(
"Plausible interval bounds PLB and PUB need to be finite."
)
# Test that all vectors are real-valued
if (
np.any(np.invert(np.isreal(x0)))
or np.any(np.invert(np.isreal(lower_bounds)))
or np.any(np.invert(np.isreal(upper_bounds)))
or np.any(np.invert(np.isreal(plausible_lower_bounds)))
or np.any(np.invert(np.isreal(plausible_upper_bounds)))
):
raise ValueError(
"""All input vectors (x0, lower_bounds, upper_bounds,
plausible_lower_bounds, plausible_upper_bounds), if specified,
need to be real valued."""
)
# Cast all vectors to floats
# (integer_vars are represented as floats but handled separately).
if np.issubdtype(x0.dtype, np.integer):
logging.warning("Casting initial points to floating point.")
x0 = x0.astype(np.float64)
if np.issubdtype(lower_bounds.dtype, np.integer):
logging.warning("Casting lower bounds to floating point.")
lower_bounds = lower_bounds.astype(np.float64)
if np.issubdtype(upper_bounds.dtype, np.integer):
logging.warning("Casting upper bounds to floating point.")
upper_bounds = upper_bounds.astype(np.float64)
if np.issubdtype(plausible_lower_bounds.dtype, np.integer):
logging.warning(
"Casting plausible lower bounds to floating point."
)
plausible_lower_bounds = plausible_lower_bounds.astype(np.float64)
if np.issubdtype(plausible_upper_bounds.dtype, np.integer):
logging.warning(
"Casting plausible upper bounds to floating point."
)
plausible_upper_bounds = plausible_upper_bounds.astype(np.float64)
# Fixed variables (all bounds equal) are not supported
fixidx = (
(lower_bounds == upper_bounds)
& (upper_bounds == plausible_lower_bounds)
& (plausible_lower_bounds == plausible_upper_bounds)
)
if np.any(fixidx):
raise ValueError(
"""vbmc:FixedVariables VBMC does not support fixed
variables. Lower and upper bounds should be different."""
)
# Test that plausible bounds are different
if np.any(plausible_lower_bounds == plausible_upper_bounds):
raise ValueError(
"""vbmc:MatchingPB:For all variables,
plausible lower and upper bounds need to be distinct."""
)
# Check that all X0 are inside the bounds
if np.any(x0 < lower_bounds) or np.any(x0 > upper_bounds):
raise ValueError(
"""vbmc:InitialPointsNotInsideBounds: The starting
points X0 are not inside the provided hard bounds LB and UB."""
)
# % Compute "effective" bounds (slightly inside provided hard bounds)
bounds_range = upper_bounds - lower_bounds
bounds_range[np.isinf(bounds_range)] = 1e3
scale_factor = 1e-3
realmin = sys.float_info.min
LB_eff = lower_bounds + scale_factor * bounds_range
LB_eff[np.abs(lower_bounds) <= realmin] = (
scale_factor * bounds_range[np.abs(lower_bounds) <= realmin]
)
UB_eff = upper_bounds - scale_factor * bounds_range
UB_eff[np.abs(upper_bounds) <= realmin] = (
-scale_factor * bounds_range[np.abs(upper_bounds) <= realmin]
)
# Infinities stay the same
LB_eff[np.isinf(lower_bounds)] = lower_bounds[np.isinf(lower_bounds)]
UB_eff[np.isinf(upper_bounds)] = upper_bounds[np.isinf(upper_bounds)]
if np.any(LB_eff >= UB_eff):
raise ValueError(
"""vbmc:StrictBoundsTooClose: Hard bounds LB and UB
are numerically too close. Make them more separate."""
)
# Fix when provided X0 are almost on the bounds -- move them inside
if np.any(x0 < LB_eff) or np.any(x0 > UB_eff):
self.logger.warning(
"vbmc:InitialPointsTooClosePB: The starting points X0 are on "
"or numerically too close to the hard bounds LB and UB. "
"Moving the initial points more inside..."
)
x0 = np.maximum((np.minimum(x0, UB_eff)), LB_eff)
# Test order of bounds (permissive)
ordidx = (
(lower_bounds <= plausible_lower_bounds)
& (plausible_lower_bounds < plausible_upper_bounds)
& (plausible_upper_bounds <= upper_bounds)
)
if np.any(np.invert(ordidx)):
raise ValueError(
"""vbmc:StrictBounds: For each variable, hard and
plausible bounds should respect the ordering LB < PLB < PUB < UB."""
)
# Test that plausible bounds are reasonably separated from hard bounds
if np.any(LB_eff > plausible_lower_bounds) or np.any(
plausible_upper_bounds > UB_eff
):
self.logger.warning(
"vbmc:TooCloseBounds: For each variable, hard "
"and plausible bounds should not be too close. "
"Moving plausible bounds."
)
plausible_lower_bounds = np.maximum(plausible_lower_bounds, LB_eff)
plausible_upper_bounds = np.minimum(plausible_upper_bounds, UB_eff)
# Check that all X0 are inside the plausible bounds,
# move bounds otherwise
if np.any(x0 <= plausible_lower_bounds) or np.any(
x0 >= plausible_upper_bounds
):
self.logger.warning(
"vbmc:InitialPointsOutsidePB. The starting points X0"
" are not inside the provided plausible bounds PLB and "
"PUB. Expanding the plausible bounds..."
)
plausible_lower_bounds = np.minimum(
plausible_lower_bounds, x0.min(0)
)
plausible_upper_bounds = np.maximum(
plausible_upper_bounds, x0.max(0)
)
# Test order of bounds
ordidx = (
(lower_bounds < plausible_lower_bounds)
& (plausible_lower_bounds < plausible_upper_bounds)
& (plausible_upper_bounds < upper_bounds)
)
if np.any(np.invert(ordidx)):
raise ValueError(
"""vbmc:StrictBounds: For each variable, hard and
plausible bounds should respect the ordering LB < PLB < PUB < UB."""
)
# Check that variables are either bounded or unbounded
# (not half-bounded)
if np.any(
(np.isfinite(lower_bounds) & np.isinf(upper_bounds))
| (np.isinf(lower_bounds) & np.isfinite(upper_bounds))
):
raise ValueError(
"""vbmc:HalfBounds: Each variable needs to be unbounded or
bounded. Variables bounded only below/above are not supported."""
)
return (
x0,
lower_bounds,
upper_bounds,
plausible_lower_bounds,
plausible_upper_bounds,
)
def _init_optim_state(self):
"""
A private function to init the optim_state dict that contains
information about VBMC variables.
"""
# Record starting points (original coordinates)
y_orig = np.array(self.options.get("f_vals")).ravel()
if len(y_orig) == 0:
y_orig = np.full([self.x0.shape[0]], np.nan)
if len(self.x0) != len(y_orig):
raise ValueError(
"""vbmc:MismatchedStartingInputs The number of
points in X0 and of their function values as specified in
self.options.f_vals are not the same."""
)
optim_state = {}
optim_state["cache"] = {}
optim_state["cache"]["x_orig"] = self.x0
optim_state["cache"]["y_orig"] = y_orig
# Does the starting cache contain function values?
optim_state["cache_active"] = np.any(
np.isfinite(optim_state.get("cache").get("y_orig"))
)
# Integer variables
optim_state["integer_vars"] = np.full(self.D, False)
if len(self.options.get("integer_vars")) > 0:
integeridx = self.options.get("integer_vars") != 0
optim_state["integer_vars"][integeridx] = True
if (
np.any(np.isinf(self.lower_bounds[:, integeridx]))
or np.any(np.isinf(self.upper_bounds[:, integeridx]))
or np.any(self.lower_bounds[:, integeridx] % 1 != 0.5)
or np.any(self.upper_bounds[:, integeridx] % 1 != 0.5)
):
raise ValueError(
"""Hard bounds of integer variables need to be
set at +/- 0.5 points from their boundary values (e.g., -0.5
nd 10.5 for a variable that takes values from 0 to 10)"""
)
# fprintf('Index of variable restricted to integer values: %s.\n'
optim_state["lb_orig"] = self.lower_bounds.copy()
optim_state["ub_orig"] = self.upper_bounds.copy()
optim_state["plb_orig"] = self.plausible_lower_bounds.copy()
optim_state["pub_orig"] = self.plausible_upper_bounds.copy()
eps_orig = (self.upper_bounds - self.lower_bounds) * self.options.get(
"tol_bound_x"
)
# inf - inf raises warning in numpy, but output is correct
with np.errstate(invalid="ignore"):
optim_state["lb_eps_orig"] = self.lower_bounds + eps_orig
optim_state["ub_eps_orig"] = self.upper_bounds - eps_orig
# Transform variables (Transform of lower bounds and upper bounds can
# create warning but we are aware of this and output is correct)
with np.errstate(divide="ignore"):
optim_state["lb_tran"] = self.parameter_transformer(
self.lower_bounds
)
optim_state["ub_tran"] = self.parameter_transformer(
self.upper_bounds
)
optim_state["plb_tran"] = self.parameter_transformer(
self.plausible_lower_bounds
)
optim_state["pub_tran"] = self.parameter_transformer(
self.plausible_upper_bounds
)
# Before first iteration
# Iterations are from 0 onwards in optimize so we should have -1
# here. In MATLAB this was 0.
optim_state["iter"] = -1
# Estimate of GP observation noise around the high posterior
# density region
optim_state["sn2_hpd"] = np.inf
# When was the last warping action performed (number of iterations)
optim_state["last_warping"] = -np.inf
# When was the last warping action performed and not undone
# (number of iterations)
optim_state["last_successful_warping"] = -np.inf
# Number of warpings performed
optim_state["warping_count"] = 0
# When GP hyperparameter sampling is switched with optimization
if self.options.get("ns_gp_max") > 0:
optim_state["stop_sampling"] = 0
else:
optim_state["stop_sampling"] = np.Inf
# Fully recompute variational posterior
optim_state["recompute_var_post"] = True
# Start with warm-up?
optim_state["warmup"] = self.options.get("warmup")
if self.options.get("warmup"):
optim_state["last_warmup"] = np.inf
else:
optim_state["last_warmup"] = 0
# Number of stable function evaluations during warmup
# with small increment
optim_state["warmup_stable_count"] = 0
# Proposal function for search
if self.options.get("proposal_fcn") is None:
optim_state["proposal_fcn"] = "@(x)proposal_vbmc"
else:
optim_state["proposal_fcn"] = self.options.get("proposal_fcn")
# Quality of the variational posterior
optim_state["R"] = np.inf
# Start with adaptive sampling
optim_state["skip_active_sampling"] = False
# Running mean and covariance of variational posterior
# in transformed space
optim_state["run_mean"] = []
optim_state["run_cov"] = []
# Last time running average was updated
optim_state["last_run_avg"] = np.NaN
# Current number of components for variational posterior
optim_state["vp_K"] = self.options.get("k_warmup")
# Number of variational components pruned in last iteration
optim_state["pruned"] = 0
# Need to switch from deterministic entropy to stochastic entropy
optim_state["entropy_switch"] = self.options.get("entropy_switch")
# Only use deterministic entropy if D larger than a fixed number
if self.D < self.options.get("det_entropy_min_d"):
optim_state["entropy_switch"] = False
# Tolerance threshold on GP variance (used by some acquisition fcns)
optim_state["tol_gp_var"] = self.options.get("tol_gp_var")
# Copy maximum number of fcn. evaluations,
# used by some acquisition fcns.
optim_state["max_fun_evals"] = self.options.get("max_fun_evals")
# By default, apply variance-based regularization
# to acquisition functions
optim_state["variance_regularized_acqfcn"] = True
# Setup search cache
optim_state["search_cache"] = []
# Set uncertainty handling level
# (0: none; 1: unknown noise level; 2: user-provided noise)
if self.options.get("specify_target_noise"):
optim_state["uncertainty_handling_level"] = 2
elif len(self.options.get("uncertainty_handling")) > 0:
optim_state["uncertainty_handling_level"] = 1
else:
optim_state["uncertainty_handling_level"] = 0
# Empty hedge struct for acquisition functions
if self.options.get("acq_hedge"):
optim_state["hedge"] = []
# List of points at the end of each iteration
optim_state["iter_list"] = {}
optim_state["iter_list"]["u"] = []
optim_state["iter_list"]["f_val"] = []
optim_state["iter_list"]["f_sd"] = []
optim_state["iter_list"]["fhyp"] = []
# Deterministic entropy approximation lower/upper factor
optim_state["entropy_alpha"] = self.options.get("det_entropy_alpha")
# Repository of variational solutions (not used in Python)
# optim_state["vp_repo"] = []
# Repeated measurement streak
optim_state["repeated_observations_streak"] = 0
# List of data trimming events
optim_state["data_trim_list"] = []
# Expanding search bounds
prange = optim_state.get("pub_tran") - optim_state.get("plb_tran")
optim_state["lb_search"] = np.maximum(
optim_state.get("plb_tran")
- prange * self.options.get("active_search_bound"),
optim_state.get("lb_tran"),
)
optim_state["ub_search"] = np.minimum(
optim_state.get("pub_tran")
+ prange * self.options.get("active_search_bound"),
optim_state.get("ub_tran"),
)
# Initialize Gaussian process settings
# Squared exponential kernel with separate length scales
optim_state["gp_cov_fun"] = 1
if optim_state.get("uncertainty_handling_level") == 0:
# Observation noise for stability
optim_state["gp_noise_fun"] = [1, 0, 0]
elif optim_state.get("uncertainty_handling_level") == 1:
# Infer noise
optim_state["gp_noise_fun"] = [1, 2, 0]
elif optim_state.get("uncertainty_handling_level") == 2:
# Provided heteroskedastic noise
optim_state["gp_noise_fun"] = [1, 1, 0]
if (
self.options.get("noise_shaping")
and optim_state["gp_noise_fun"][1] == 0
):
optim_state["gp_noise_fun"][1] = 1
optim_state["gp_mean_fun"] = self.options.get("gp_mean_fun")
valid_gp_mean_funs = [
"zero",
"const",
"negquad",
"se",
"negquadse",
"negquadfixiso",
"negquadfix",
"negquadsefix",
"negquadonly",
"negquadfixonly",
"negquadlinonly",
"negquadmix",
]
if not optim_state["gp_mean_fun"] in valid_gp_mean_funs:
raise ValueError(
"""vbmc:UnknownGPmean:Unknown/unsupported GP mean
function. Supported mean functions are zero, const,
egquad, and se"""
)
optim_state["int_mean_fun"] = self.options.get("gp_int_mean_fun")
# more logic here in matlab
# Starting threshold on y for output warping
if self.options.get("fitness_shaping"):
optim_state["out_warp_delta"] = self.options.get(
"out_warp_thresh_base"
)
else:
optim_state["out_warp_delta"] = []
return optim_state
def optimize(self):
"""
Run inference on an initialized ``VBMC`` object.
VBMC computes a variational approximation of the full posterior and the
ELBO (evidence lower bound), a lower bound on the log normalization
constant (log marginal likelhood or log model evidence) for the provided
unnormalized log posterior.
Returns
-------
vp : VariationalPosterior
The ``VariationalPosterior`` computed by VBMC.
results : dict
A dictionary with additional information about the VBMC run.
"""
# Initialize main logger with potentially new options:
self.logger = self._init_logger()
# set up strings for logging of the iteration
display_format = self._setup_logging_display_format()
if self.optim_state["uncertainty_handling_level"] > 0:
self.logger.info(
"Beginning variational optimization assuming NOISY observations"
" of the log-joint"
)
else:
self.logger.info(
"Beginning variational optimization assuming EXACT observations"
" of the log-joint."
)
if self.is_finished:
self.logger.warning("Continuing optimization from previous state.")
self.is_finished = False
self.vp = self.iteration_history["vp"][-1]
self.optim_state = self.iteration_history["optim_state"][-1]
self._log_column_headers()
while not self.is_finished:
self.iteration += 1
# Reset timer:
timer.reset()
self.optim_state["iter"] = self.iteration
self.optim_state["redo_roto_scaling"] = False
vp_old = copy.deepcopy(self.vp)
self.logging_action = []
if self.iteration == 0 and self.optim_state["warmup"]:
self.logging_action.append("start warm-up")
# Switch to stochastic entropy towards the end if still
# deterministic.
if self.optim_state.get("entropy_switch") and (
self.function_logger.func_count
>= self.optim_state.get("entropy_force_switch")
* self.optim_state.get("max_fun_evals")
):
self.optim_state["entropy_switch"] = False
self.logging_action.append("entropy switch")
## Input warping / reparameterization
if self.options["incremental_warp_delay"]:
WarpDelay = self.options["warp_every_iters"] * np.max(
[1, self.optim_state["warping_count"]]
)
else:
WarpDelay = self.options["warp_every_iters"]
doWarping = (
(
self.options.get("warp_rotoscaling")
or self.options.get("warp_nonlinear")
)
and (self.iteration > 0)
and (not self.optim_state["warmup"])
and (
self.iteration - self.optim_state["last_warping"]
> WarpDelay
)
and (self.vp.K >= self.options["warp_min_k"])
and (
self.iteration_history["r_index"][self.iteration - 1]
< self.options["warp_tol_reliability"]
)
and (self.vp.D > 1)
)
if doWarping:
timer.start_timer("warping")
vp_tmp, __, __, __ = self.determine_best_vp()
vp_tmp = copy.deepcopy(vp_tmp)
# Store variables in case warp needs to be undone:
# (vp_old copied above)
optim_state_old = copy.deepcopy(self.optim_state)
gp_old = copy.deepcopy(self.gp)
function_logger_old = copy.deepcopy(self.function_logger)
elbo_old = self.iteration_history["elbo"][-1]
elbo_sd_old = self.iteration_history["elbo_sd"][-1]
hyp_dict_old = copy.deepcopy(self.hyp_dict)
# Compute and apply whitening transform:
(
parameter_transformer_warp,
self.optim_state,
self.function_logger,
warp_action,
) = warp_input(
vp_tmp,
self.optim_state,
self.function_logger,
self.options,
)
self.vp, self.hyp_dict["hyp"] = warp_gp_and_vp(
parameter_transformer_warp, self.gp, self.vp, self
)
self.logging_action.append(warp_action)
timer.stop_timer("warping")
if self.options.get("warp_undo_check"):
## Train gp
timer.start_timer("gp_train")
self.gp, Ns_gp, sn2_hpd, self.hyp_dict = train_gp(
self.hyp_dict,
self.optim_state,
self.function_logger,
self.iteration_history,
self.options,
self.optim_state["plb_tran"],
self.optim_state["pub_tran"],
)
self.optim_state["sn2_hpd"] = sn2_hpd
timer.stop_timer("gp_train")
## Optimize variational parameters
timer.start_timer("variational_fit")
if not self.vp.optimize_mu:
# Variational components fixed to training inputs
self.vp.mu = self.gp.X.T
Knew = self.vp.mu.shape[1]
else:
# Update number of variational mixture components
Knew = self.vp.K
# Decide number of fast/slow optimizations
N_fastopts = math.ceil(
self.options.eval("ns_elbo", {"K": self.vp.K})
)
N_slowopts = self.options.get(
"elbo_starts"
) # Full optimizations.
# Run optimization of variational parameters
self.vp, var_ss, pruned = optimize_vp(
self.options,
self.optim_state,
self.vp,
self.gp,
N_fastopts,
N_slowopts,
Knew,
)
self.optim_state["vp_K"] = self.vp.K
# Save current entropy
self.optim_state["H"] = self.vp.stats["entropy"]
# Get real variational posterior (might differ from training posterior)
# vp_real = vp.vptrain2real(0, self.options)
vp_real = self.vp
elbo = vp_real.stats["elbo"]
elbo_sd = vp_real.stats["elbo_sd"]
timer.stop_timer("variational_fit")
# Keep warping only if it substantially improves ELBO
# and uncertainty does not blow up too much
if (
elbo
< (elbo_old + self.options["warp_tol_improvement"])
) or (
elbo_sd
> (
elbo_sd_old
* self.options["warp_tol_sd_multiplier"]
+ self.options["warp_tol_sd_base"]
)
):
# Undo input warping:
self.vp = vp_old
self.gp = gp_old
self.optim_state = optim_state_old
self.function_logger = function_logger_old
self.hyp_dict = hyp_dict_old
# Still keep track of failed warping (failed warp counts twice)
self.optim_state["warping_count"] += 2
self.optim_state["last_warping"] = self.optim_state[
"iter"
]
self.logging_action.append("undo " + warp_action)
## Actively sample new points into the training set
timer.start_timer("active_sampling")
self.parameter_transformer = self.vp.parameter_transformer
self.function_logger.parameter_transformer = (
self.parameter_transformer
)
if self.iteration == 0:
new_funevals = self.options.get("fun_eval_start")
else:
new_funevals = self.options.get("fun_evals_per_iter")
# Careful with Xn, in MATLAB this condition is > 0
# due to 1-based indexing.
if self.function_logger.Xn >= 0:
self.function_logger.y_max = np.max(
self.function_logger.y[self.function_logger.X_flag]
)
if self.optim_state.get("skip_active_sampling"):
self.optim_state["skip_active_sampling"] = False
else:
if (
self.gp is not None
and self.options.get("separate_search_gp")
and not self.options.get("varactivesample")
):
# Train a distinct GP for active sampling
# Since we are doing iterations from 0 onwards
# instead of from 1 onwards, this should be checking
# oddness, not evenness.
if self.iteration % 2 == 1:
meantemp = self.optim_state.get("gp_mean_fun")
self.optim_state["gp_mean_fun"] = "const"
timer.start_timer("separate_gp_train")
gp_search, Ns_gp, sn2_hpd, self.hyp_dict = train_gp(
self.hyp_dict,
self.optim_state,
self.function_logger,
self.iteration_history,
self.options,
self.optim_state["plb_tran"],
self.optim_state["pub_tran"],
)
timer.stop_timer("separate_gp_train")
self.optim_state["sn2_hpd"] = sn2_hpd
self.optim_state["gp_mean_fun"] = meantemp
else:
gp_search = self.gp
else:
gp_search = self.gp
# Perform active sampling
if self.options.get("varactivesample"):
# FIX TIMER HERE IF USING THIS
# [optimState,vp,t_active,t_func] =
# variationalactivesample_vbmc(optimState,new_funevals,
# funwrapper,vp,vp_old,gp_search,options)
sys.exit("Function currently not supported")
else:
self.optim_state["hyp_dict"] = self.hyp_dict
(
self.function_logger,
self.optim_state,
self.vp,
self.gp,
) = active_sample(
gp_search,
new_funevals,
self.optim_state,
self.function_logger,
self.iteration_history,
self.vp,
self.options,
)
self.hyp_dict = self.optim_state["hyp_dict"]
# Number of training inputs
self.optim_state["N"] = self.function_logger.Xn + 1
self.optim_state["n_eff"] = np.sum(
self.function_logger.n_evals[self.function_logger.X_flag]
)
timer.stop_timer("active_sampling")
## Train gp
timer.start_timer("gp_train")
self.gp, Ns_gp, sn2_hpd, self.hyp_dict = train_gp(
self.hyp_dict,
self.optim_state,
self.function_logger,
self.iteration_history,
self.options,
self.optim_state["plb_tran"],
self.optim_state["pub_tran"],
)
self.optim_state["sn2_hpd"] = sn2_hpd
timer.stop_timer("gp_train")
# Check if reached stable sampling regime
if (
Ns_gp == self.options.get("stable_gp_samples")
and self.optim_state.get("stop_sampling") == 0
):
self.optim_state["stop_sampling"] = self.optim_state.get("N")
## Optimize variational parameters
timer.start_timer("variational_fit")
if not self.vp.optimize_mu:
# Variational components fixed to training inputs
self.vp.mu = self.gp.X.T.copy()
Knew = self.vp.mu.shape[1]
else:
# Update number of variational mixture components
Knew = update_K(
self.optim_state, self.iteration_history, self.options
)
# Decide number of fast/slow optimizations
N_fastopts = math.ceil(
self.options.eval("ns_elbo", {"K": self.vp.K})
)
if self.optim_state.get("recompute_var_post") or (
self.options.get("always_refit_vp")
):
# Full optimizations
N_slowopts = self.options.get("elbo_starts")
self.optim_state["recompute_var_post"] = False
else:
# Only incremental change from previous iteration
N_fastopts = math.ceil(
N_fastopts * self.options.get("ns_elbo_incr")
)
N_slowopts = 1
# Run optimization of variational parameters
self.vp, var_ss, pruned = optimize_vp(
self.options,
self.optim_state,
self.vp,
self.gp,
N_fastopts,
N_slowopts,
Knew,
)
self.optim_state["vp_K"] = self.vp.K
# Save current entropy
self.optim_state["H"] = self.vp.stats["entropy"]
# Get real variational posterior (might differ from training posterior)
# vp_real = vp.vptrain2real(0, self.options)
vp_real = self.vp
elbo = vp_real.stats["elbo"]
elbo_sd = vp_real.stats["elbo_sd"]
timer.stop_timer("variational_fit")
# Finalize iteration
timer.start_timer("finalize")
# Compute symmetrized KL-divergence between old and new posteriors
Nkl = int(1e5)
sKL = max(
0,
0.5
* np.sum(
self.vp.kl_div(
vp2=vp_old,
N=Nkl,
gauss_flag=self.options.get("kl_gauss"),
)
),
)
# Evaluate max LCB of GP prediction on all training inputs
f_mu, f_s2 = self.gp.predict(
self.gp.X, self.gp.y, self.gp.s2, add_noise=False
)
self.optim_state["lcb_max"] = np.amax(
f_mu - self.options.get("elcbo_impro_weight") * np.sqrt(f_s2)
)
# Compare variational posterior's moments with ground truth
if (
self.options.get("true_mean")
and self.options.get("true_cov")
and np.all(np.isfinite(self.options.get("true_mean")))
and np.all(np.isfinite(self.options.get("true_cov")))
):
mubar_orig, sigma_orig = vp_real.moments(1e6, True, True)
kl = kl_div_mvn(
mubar_orig,
sigma_orig,
self.options.get("true_mean"),
self.options.get("true_cov"),
)
sKL_true = 0.5 * np.sum(kl)
else:
sKL_true = None
# Record moments in transformed space
mubar, sigma = self.vp.moments(orig_flag=False, cov_flag=True)
if len(self.optim_state.get("run_mean")) == 0 or len(
self.optim_state.get("run_cov") == 0
):
self.optim_state["run_mean"] = mubar.reshape(1, -1)
self.optim_state["run_cov"] = sigma
self.optim_state["last_run_avg"] = self.optim_state.get("N")
else:
Nnew = self.optim_state.get("N") - self.optim_state.get(
"last_run_avg"
)
wRun = self.options.get("moments_run_weight") ** Nnew
self.optim_state["run_mean"] = wRun * self.optim_state.get(
"run_mean"
) + (1 - wRun) * mubar.reshape(1, -1)
self.optim_state["run_cov"] = (
wRun * self.optim_state.get("run_cov") + (1 - wRun) * sigma
)
self.optim_state["last_run_avg"] = self.optim_state.get("N")
timer.stop_timer("finalize")
# timer.totalruntime = NaN; # Update at the end of iteration
# timer
iteration_values = {
"iter": self.iteration,
"vp": self.vp,
"elbo": elbo,
"elbo_sd": elbo_sd,
"var_ss": var_ss,
"sKL": sKL,
"sKL_true": sKL_true,
"gp": self.gp,
"gp_hyp_full": self.gp.get_hyperparameters(as_array=True),
"Ns_gp": Ns_gp,
"pruned": pruned,
"timer": timer,
"func_count": self.function_logger.func_count,
"lcb_max": self.optim_state["lcb_max"],
"n_eff": self.optim_state["n_eff"],
"function_logger": self.function_logger,
}
# Record all useful stats
self.iteration_history.record_iteration(
iteration_values,
self.iteration,
)
# Check warmup
if (
self.optim_state.get("iter") > 1
and self.optim_state.get("stop_gp_sampling") == 0
and not self.optim_state.get("warmup")
):
if self._is_gp_sampling_finished():
self.optim_state[
"stop_gp_sampling"
] = self.optim_state.get("N")
# Check termination conditions
(
self.is_finished,
termination_message,
) = self._check_termination_conditions()
# Save stability
self.vp.stats["stable"] = self.iteration_history["stable"][
self.iteration
]
# Check if we are still warming-up
if self.optim_state.get("warmup") and self.iteration > 0:
if self.options.get("recompute_lcb_max"):
self.optim_state[
"lcb_max_vec"
] = self._recompute_lcb_max().T
trim_flag = self._check_warmup_end_conditions()
if trim_flag:
self._setup_vbmc_after_warmup()
# Re-update GP after trimming
self.gp = reupdate_gp(self.function_logger, self.gp)
if not self.optim_state.get("warmup"):
self.vp.optimize_mu = self.options.get("variable_means")
self.vp.optimize_weights = self.options.get(
"variable_weights"
)
# Switch to main algorithm options
# options = options_main
# Reset GP hyperparameter covariance
# hypstruct.runcov = []
self.hyp_dict["runcov"] = None
# Reset VP repository (not used in python)
self.optim_state["vp_repo"] = []
# Re-get acq info
# self.optim_state['acq_info'] = getAcqInfo(
# options.SearchAcqFcn
# )
# Needs to be below the above block since warmup value can change
# in _check_warmup_end_conditions
self.iteration_history.record(
"warmup", self.optim_state.get("warmup"), self.iteration
)
# Check and update fitness shaping / output warping threshold
if (
self.optim_state.get("out_warp_delta") != []
and self.optim_state.get("R") is not None
and (
self.optim_state.get("R")
< self.options.get("warp_tol_reliability")
)
):
Xrnd, _ = self.vp.sample(N=int(2e4), orig_flag=False)
ymu, _ = self.gp.predict(Xrnd, add_noise=True)
ydelta = max(
[0, self.function_logger.y_max - np.quantile(ymu, 1e-3)]
)
if (
ydelta
> self.optim_state.get("out_warp_delta")
* self.options.get("out_warp_thresh_tol")
and self.optim_state.get("R") is not None
and self.optim_state.get("R") < 1
):
self.optim_state["out_warp_delta"] = self.optim_state.get(
"out_warp_delta"
) * self.options.get("out_warp_thresh_mult")
# Write iteration output
# Stopped GP sampling this iteration?
if (
Ns_gp == self.options["stable_gp_samples"]
and self.iteration_history["Ns_gp"][max(0, self.iteration - 1)]
> self.options["stable_gp_samples"]
):
if Ns_gp == 0:
self.logging_action.append("switch to GP opt")
else:
self.logging_action.append("stable GP sampling")
if self.options.get("print_iteration_header") is None:
# Default behavior, try to guess based on plotting options:
reprint_headers = (
self.options.get("plot")
and self.iteration > 0
and "inline" in plt.get_backend()
)
elif self.options["print_iteration_header"]:
# Re-print every iteration after 0th
reprint_headers = self.iteration > 0
else:
# Never re-print headers
reprint_headers = False
# Reprint the headers if desired:
if reprint_headers:
self._log_column_headers()
if self.optim_state["cache_active"]:
self.logger.info(
display_format.format(
self.iteration,
self.function_logger.func_count,
self.function_logger.cache_count,
elbo,
elbo_sd,
sKL,
self.vp.K,
self.optim_state["R"],
", ".join(self.logging_action),
)
)
else:
if (
self.optim_state["uncertainty_handling_level"] > 0
and self.options.get("max_repeated_observations") > 0
):
self.logger.info(
display_format.format(
self.iteration,
self.function_logger.func_count,
self.optim_state["N"],
elbo,
elbo_sd,
sKL,
self.vp.K,
self.optim_state["R"],
", ".join(self.logging_action),
)
)
else:
self.logger.info(
display_format.format(
self.iteration,
self.function_logger.func_count,
elbo,
elbo_sd,
sKL,
self.vp.K,
self.optim_state["R"],
", ".join(self.logging_action),
)
)
self.iteration_history.record(
"logging_action", self.logging_action, self.iteration
)
# Plot iteration
if self.options.get("plot"):
if self.iteration > 0:
previous_gp = self.iteration_history["gp"][
self.iteration - 1
]
# find points that are new in this iteration
# (hacky cause numpy only has 1D set diff)
# future fix: active sampling should return the set of
# indices of the added points
highlight_data = np.array(
[
i
for i, x in enumerate(self.gp.X)
if tuple(x) not in set(map(tuple, previous_gp.X))
]
)
else:
highlight_data = None
if len(self.logging_action) > 0:
title = "VBMC iteration {} ({})".format(
self.iteration, ", ".join(self.logging_action)
)
else:
title = "VBMC iteration {}".format(self.iteration)
self.vp.plot(
plot_data=True,
highlight_data=highlight_data,
plot_vp_centres=True,
title=title,
)
plt.show()
# Record optim_state and random state
self.random_state = np.random.get_state()
self.iteration_history.record_iteration(
{
"optim_state": self.optim_state,
"random_state": self.random_state,
},
self.iteration,
)
# Pick "best" variational solution to return
self.vp, elbo, elbo_sd, idx_best = self.determine_best_vp()
if self.options.get("do_final_boost"):
# Last variational optimization with large number of components
self.vp, elbo, elbo_sd, changed_flag = self.final_boost(
self.vp, self.iteration_history["gp"][idx_best]
)
else:
changed_flag = False
if changed_flag:
# Recompute symmetrized KL-divergence
if "vp_old" in locals():
sKL = max(
0,
0.5
* np.sum(
self.vp.kl_div(
vp2=vp_old,
N=Nkl,
gauss_flag=self.options.get("kl_gauss"),
)
),
)
else:
sKL = -1 # sKL is undefined
if self.options.get("plot"):
self._log_column_headers()
if (
self.optim_state["uncertainty_handling_level"] > 0
and self.options.get("max_repeated_observations") > 0
):
self.logger.info(
display_format.format(
np.Inf,
self.function_logger.func_count,
self.optim_state["N"],
elbo,
elbo_sd,
sKL,
self.vp.K,
self.iteration_history.get("r_index")[idx_best],
"finalize",
)
)
else:
self.logger.info(
display_format.format(
np.Inf,
self.function_logger.func_count,
elbo,
elbo_sd,
sKL,
self.vp.K,
self.iteration_history.get("r_index")[idx_best],
"finalize",
)
)
# plot final vp:
if self.options.get("plot"):
self.vp.plot(
plot_data=True,
highlight_data=None,
plot_vp_centres=True,
title="VBMC final ({} iterations)".format(self.iteration),
)
plt.show()
# Set exit_flag based on stability
if self.vp.stats["stable"]:
success_flag = True
else:
success_flag = False
# Print final message
self.logger.warning(termination_message)
self.logger.warning(
"Estimated ELBO: {:.3f} +/-{:.3f}.".format(elbo, elbo_sd)
)
if not success_flag:
self.logger.warning(
"Caution: Returned variational solution may have"
" not converged."
)
results = self._create_result_dict(
idx_best, termination_message, success_flag
)
return copy.deepcopy(self.vp), results
def _check_warmup_end_conditions(self):
"""
Private method to check the warmup end conditions.
"""
iteration = self.optim_state.get("iter")
# First requirement for stopping, no constant improvement of metric
stable_count_flag = False
stop_warmup_thresh = self.options.get(
"stop_warmup_thresh"
) * self.options.get("fun_evals_per_iter")
tol_stable_warmup_iters = math.ceil(
self.options.get("tol_stable_warmup")
/ self.options.get("fun_evals_per_iter")
)
# MATLAB has +1 on the right side due to different indexing.
if iteration > tol_stable_warmup_iters:
# Vector of ELCBO (ignore first two iterations, ELCBO is unreliable)
elcbo_vec = self.iteration_history.get("elbo") - self.options.get(
"elcbo_impro_weight"
) * self.iteration_history.get("elbo_sd")
# NB: Take care with MATLAB "end" indexing and off-by-one errors:
max_now = np.amax(
elcbo_vec[max(3, len(elcbo_vec) - tol_stable_warmup_iters) :]
)
max_before = np.amax(
elcbo_vec[2 : max(3, len(elcbo_vec) - tol_stable_warmup_iters)]
)
stable_count_flag = (max_now - max_before) < stop_warmup_thresh
# Vector of maximum lower confidence bounds (LCB) of fcn values
lcb_max_vec = self.iteration_history.get("lcb_max")[: iteration + 1]
# Second requirement, also no substantial improvement of max fcn value
# in recent iters (unless already performing BO-like warmup)
if self.options.get("warmup_check_max"):
idx_last = np.full(lcb_max_vec.shape, False)
recent_past = iteration - int(
math.ceil(
self.options.get("tol_stable_warmup")
/ self.options.get("fun_evals_per_iter")
)
+ 1
)
idx_last[max(1, recent_past) :] = True
impro_fcn = max(
0,
np.amax(lcb_max_vec[idx_last])
- np.amax(lcb_max_vec[~idx_last]),
)
else:
impro_fcn = 0
no_recent_improvement_flag = impro_fcn < stop_warmup_thresh
# Alternative criterion for stopping - no improvement over max fcn value
max_thresh = np.amax(lcb_max_vec) - self.options.get("tol_improvement")
idx_1st = np.ravel(np.argwhere(lcb_max_vec > max_thresh))[0]
yy = self.iteration_history.get("func_count")[: iteration + 1]
pos = yy[idx_1st]
currentpos = self.function_logger.func_count
no_longterm_improvement_flag = (currentpos - pos) > self.options.get(
"warmup_no_impro_threshold"
)
if len(self.optim_state.get("data_trim_list")) > 0:
last_data_trim = self.optim_state.get("data_trim_list")[-1]
else:
last_data_trim = -1 * np.Inf
no_recent_trim_flag = (
self.optim_state.get("N") - last_data_trim
) >= 10
stop_warmup = (
(stable_count_flag and no_recent_improvement_flag)
or no_longterm_improvement_flag
) and no_recent_trim_flag
return stop_warmup
def _setup_vbmc_after_warmup(self):
"""
Private method to setup multiple vbmc settings after a the warmup has
been determined to be ended. The method whether the warmup ending was
a false alarm and then only prunes.
"""
iteration = self.optim_state.get("iter")
if (
self.iteration_history.get("r_index")[iteration]
< self.options.get("stop_warmup_reliability")
or len(self.optim_state.get("data_trim_list")) >= 1
):
self.optim_state["warmup"] = False
self.logging_action.append("end warm-up")
threshold = self.options.get("warmup_keep_threshold") * (
len(self.optim_state.get("data_trim_list")) + 1
)
self.optim_state["last_warmup"] = iteration
else:
# This may be a false alarm; prune and continue
if self.options.get("warmup_keep_threshold_false_alarm") is None:
warmup_keep_threshold_false_alarm = self.options.get(
"warmup_keep_threshold"
)
else:
warmup_keep_threshold_false_alarm = self.options.get(
"warmup_keep_threshold_false_alarm"
)
threshold = warmup_keep_threshold_false_alarm * (
len(self.optim_state.get("data_trim_list")) + 1
)
self.optim_state["data_trim_list"] = np.append(
self.optim_state.get("data_trim_list"),
[self.optim_state.get("N")],
)
self.logging_action.append("trim data")
# Remove warm-up points from training set unless close to max
y_max = max(self.function_logger.y_orig[: self.function_logger.Xn + 1])
n_keep_min = self.D + 1
idx_keep = (y_max - self.function_logger.y_orig) < threshold
if np.sum(idx_keep) < n_keep_min:
y_temp = np.copy(self.function_logger.y_orig)
y_temp[~np.isfinite(y_temp)] = -np.Inf
order = np.argsort(y_temp * -1, axis=0)
idx_keep[
order[: min(n_keep_min, self.function_logger.Xn + 1)]
] = True
# Note that using idx_keep[:, 0] is necessary since X_flag
# is a 1D array and idx_keep a 2D array.
self.function_logger.X_flag = np.logical_and(
idx_keep[:, 0], self.function_logger.X_flag
)
# Skip adaptive sampling for next iteration
self.optim_state["skip_active_sampling"] = self.options.get(
"skip_active_sampling_after_warmup"
)
# Fully recompute variational posterior
self.optim_state["recompute_var_post"] = True
def _check_termination_conditions(self):
"""
Private method to determine the status of termination conditions.
It also saves the reliability index, ELCBO improvement and stableflag
to the iteration_history object.
"""
is_finished_flag = False
termination_message = ""
# Maximum number of new function evaluations
if self.function_logger.func_count >= self.options.get(
"max_fun_evals"
):
is_finished_flag = True
termination_message = (
"Inference terminated: reached maximum number "
+ "of function evaluations options.max_fun_evals."
)
# Maximum number of iterations
iteration = self.optim_state.get("iter")
if iteration + 1 >= self.options.get("max_iter"):
is_finished_flag = True
termination_message = (
"Inference terminated: reached maximum number "
+ "of iterations options.max_iter."
)
# Quicker stability check for entropy switching
if self.optim_state.get("entropy_switch"):
tol_stable_iters = self.options.get("tol_stable_entropy_iters")
else:
tol_stable_iters = int(
math.ceil(
self.options.get("tol_stable_count")
/ self.options.get("fun_evals_per_iter")
)
)
r_index, ELCBO_improvement = self._compute_reliability_index(
tol_stable_iters
)
# Store reliability index
self.iteration_history.record("r_index", r_index, iteration)
self.iteration_history.record(
"elcbo_impro", ELCBO_improvement, iteration
)
self.optim_state["R"] = r_index
# Check stability termination condition
stableflag = False
if (
iteration + 1 >= tol_stable_iters
and r_index < 1
and ELCBO_improvement < self.options.get("tol_improvement")
):
# Count how many good iters in the recent past (excluding current)
stable_count = np.sum(
self.iteration_history.get("r_index")[
iteration - tol_stable_iters + 1 : iteration
]
< 1
)
# Iteration is stable if almost all recent iterations are stable
if (
stable_count
>= tol_stable_iters
- np.floor(
tol_stable_iters
* self.options.get("tol_stable_excpt_frac")
)
- 1
):
if self.optim_state.get("entropy_switch"):
# If stable but entropy switch is On,
# turn it off and continue
self.optim_state["entropy_switch"] = False
self.logging_action.append("entropy switch")
else:
# Allow termination only if distant from last warping
if (
iteration
- self.optim_state.get("last_successful_warping", 0)
>= tol_stable_iters / 3
):
is_finished_flag = True
termination_message = (
"Inference terminated: variational "
+ "solution stable for options.tol_stable_count "
+ "fcn evaluations."
)
stableflag = True
self.logging_action.append("stable")
# Store stability flag
self.iteration_history.record("stable", stableflag, iteration)
# Prevent early termination
if self.function_logger.func_count < self.options.get(
"min_fun_evals"
) or iteration < self.options.get("min_iter"):
is_finished_flag = False
return (
is_finished_flag,
termination_message,
)
def _compute_reliability_index(self, tol_stable_iters):
"""
Private function to compute the reliability index.
"""
iteration_idx = self.optim_state.get("iter")
# Was < 3 in MATLAB due to different indexing.
if self.optim_state.get("iter") < 2:
r_index = np.Inf
ELCBO_improvement = np.NaN
return r_index, ELCBO_improvement
sn = np.sqrt(self.optim_state.get("sn2_hpd"))
tol_sn = np.sqrt(sn / self.options.get("tol_sd")) * self.options.get(
"tol_sd"
)
tol_sd = min(
max(self.options.get("tol_sd"), tol_sn),
self.options.get("tol_sd") * 10,
)
r_index_vec = np.full((3), np.NaN)
r_index_vec[0] = (
np.abs(
self.iteration_history.get("elbo")[iteration_idx]
- self.iteration_history.get("elbo")[iteration_idx - 1]
)
/ tol_sd
)
r_index_vec[1] = (
self.iteration_history.get("elbo_sd")[iteration_idx] / tol_sd
)
r_index_vec[2] = self.iteration_history.get("sKL")[
iteration_idx
] / self.options.get("tol_skl")
# Compute average ELCBO improvement per fcn eval in the past few iters
idx0 = int(
max(
0,
self.optim_state.get("iter")
- math.ceil(0.5 * tol_stable_iters)
+ 1,
)
)
# Remember than upper end of range is exclusive in Python, so +1 is
# needed.
xx = self.iteration_history.get("func_count")[idx0 : iteration_idx + 1]
yy = (
self.iteration_history.get("elbo")[idx0 : iteration_idx + 1]
- self.options.get("elcbo_impro_weight")
* self.iteration_history.get("elbo_sd")[idx0 : iteration_idx + 1]
)
# need to casts here to get things to run
ELCBO_improvement = np.polyfit(
list(map(float, xx)), list(map(float, yy)), 1
)[0]
return np.mean(r_index_vec), ELCBO_improvement
def _is_gp_sampling_finished(self):
"""
Private function to check if the MCMC sampling of the Gaussian Process
is finished.
"""
finished_flag = False
# Stop sampling after sample variance has stabilized below ToL
iteration = self.optim_state.get("iter")
w1 = np.zeros((iteration + 1))
w1[iteration] = 1
w2 = np.exp(
-(
self.iteration_history.get("N")[-1]
- self.iteration_history.get("N") / 10
)
)
w2 = w2 / np.sum(w2)
w = 0.5 * w1 + 0.5 * w2
if np.sum(
w * self.iteration_history.get("gp_sample_var")
) < self.options.get("tol_gp_var_mcmc"):
finished_flag = True
return finished_flag
def _recompute_lcb_max(self):
"""
RECOMPUTE_LCB_MAX Recompute moving LCB maximum based on current GP.
"""
# ToDo: Recompute_lcb_max needs to be implemented.
return np.array([])
# Finalizing:
[docs]
def final_boost(self, vp: VariationalPosterior, gp: gpr.GP):
"""
Perform a final boost of variational components.
Parameters
----------
vp : VariationalPosterior
The VariationalPosterior that should be boosted.
gp : GaussianProcess
The corresponding GaussianProcess of the VariationalPosterior.
Returns
-------
vp : VariationalPosterior
The VariationalPosterior resulting from the final boost.
elbo : VariationalPosterior
The ELBO of the VariationalPosterior resulting from the final boost.
elbo_sd : VariationalPosterior
The ELBO_SD of the VariationalPosterior resulting from the
final boost.
changed_flag : bool
Indicates if the final boost has taken place or not.
"""
vp = copy.deepcopy(vp)
changed_flag = False
K_new = max(vp.K, self.options.get("min_final_components"))
# Current entropy samples during variational optimization
n_sent = self.options.eval("ns_ent", {"K": K_new})
n_sent_fast = self.options.eval("ns_ent_fast", {"K": K_new})
n_sent_fine = self.options.eval("ns_ent_fine", {"K": K_new})
# Entropy samples for final boost
if self.options.get("ns_ent_boost") == []:
n_sent_boost = n_sent
else:
n_sent_boost = self.options.eval("ns_ent_boost", {"K": K_new})
if self.options.get("ns_ent_fast_boost") == []:
n_sent_fast_boost = n_sent_fast
else:
n_sent_fast_boost = self.options.eval(
"ns_ent_fast_boost", {"K": K_new}
)
if self.options.get("ns_ent_fine_boost") == []:
n_sent_fine_boost = n_sent_fine
else:
n_sent_fine_boost = self.options.eval(
"ns_ent_fine_boost", {"K": K_new}
)
# Perform final boost?
do_boost = (
vp.K < self.options.get("min_final_components")
or n_sent != n_sent_boost
or n_sent_fine != n_sent_fine_boost
)
if do_boost:
# Last variational optimization with large number of components
n_fast_opts = math.ceil(self.options.eval("ns_elbo", {"K": K_new}))
n_fast_opts = int(
math.ceil(n_fast_opts * self.options.get("ns_elbo_incr"))
)
n_slow_opts = 1
options = copy.deepcopy(self.options)
# No pruning of components
options.__setitem__("tol_weight", 0, force=True)
# End warmup
self.optim_state["warmup"] = False
vp.optimize_mu = options.get("variable_means")
vp.optimize_weights = options.get("variable_weights")
options.__setitem__("ns_ent", n_sent_boost, force=True)
options.__setitem__("ns_ent_fast", n_sent_fast_boost, force=True)
options.__setitem__("ns_ent_fine", n_sent_fine_boost, force=True)
options.__setitem__("max_iter_stochastic", np.Inf, force=True)
self.optim_state["entropy_alpha"] = 0
stable_flag = np.copy(vp.stats["stable"])
vp, __, __ = optimize_vp(
options,
self.optim_state,
vp,
gp,
n_fast_opts,
n_slow_opts,
K_new,
)
vp.stats["stable"] = stable_flag
changed_flag = True
elbo = vp.stats["elbo"]
elbo_sd = vp.stats["elbo_sd"]
return vp, elbo, elbo_sd, changed_flag
[docs]
def determine_best_vp(
self,
max_idx: int = None,
safe_sd: float = 5,
frac_back: float = 0.25,
rank_criterion_flag: bool = False,
):
"""
Return the best VariationalPosterior found during the optimization of
VBMC as well as its ELBO, ELBO_SD and the index of the iteration.
Parameters
----------
max_idx : int, optional
Check up to this iteration, by default None which means last iter.
safe_sd : float, optional
Penalization for uncertainty, by default 5.
frac_back : float, optional
If no past stable iteration, go back up to this fraction of
iterations, by default 0.25.
rank_criterion_flag : bool, optional
If True use new ranking criterion method to pick best solution.
It finds a solution that combines ELCBO, stability, and recency,
by default False.
Returns
-------
vp : VariationalPosterior
The VariationalPosterior found during the optimization of VBMC.
elbo : float
The ELBO of the iteration with the best VariationalPosterior.
elbo_sd : float
The ELBO_SD of the iteration with the best VariationalPosterior.
idx_best : int
The index of the iteration with the best VariationalPosterior.
"""
# Check up to this iteration (default, last)
if max_idx is None:
max_idx = self.iteration_history.get("iter")[-1]
if self.iteration_history.get("stable")[max_idx]:
# If the current iteration is stable, return it
idx_best = max_idx
else:
# Otherwise, find best solution according to various criteria
if rank_criterion_flag:
# Find solution that combines ELCBO, stability, and recency
rank = np.zeros((max_idx + 1, 4))
# Rank by position
rank[:, 0] = np.arange(1, max_idx + 2)[::-1]
# Rank by ELCBO
lnZ_iter = self.iteration_history.get("elbo")[: max_idx + 1]
lnZsd_iter = self.iteration_history.get("elbo_sd")[
: max_idx + 1
]
elcbo = lnZ_iter - safe_sd * lnZsd_iter
order = elcbo.argsort()[::-1]
rank[order, 1] = np.arange(1, max_idx + 2)
# Rank by reliability index
order = self.iteration_history.get("r_index")[
: max_idx + 1
].argsort()
rank[order, 2] = np.arange(1, max_idx + 2)
# Rank penalty to all non-stable iterations
rank[:, 3] = max_idx
rank[
self.iteration_history.get("stable")[: max_idx + 1], 3
] = 1
idx_best = np.argmin(np.sum(rank, 1))
else:
# Find recent solution with best ELCBO
laststable = np.argwhere(
self.iteration_history.get("stable")[: max_idx + 1] == True
)
if len(laststable) == 0:
# Go some iterations back if no previous stable iteration
idx_start = max(
0, int(math.ceil(max_idx - max_idx * frac_back))
)
else:
idx_start = np.ravel(laststable)[-1]
lnZ_iter = self.iteration_history.get("elbo")[
idx_start : max_idx + 1
]
lnZsd_iter = self.iteration_history.get("elbo_sd")[
idx_start : max_idx + 1
]
elcbo = lnZ_iter - safe_sd * lnZsd_iter
idx_best = idx_start + np.argmax(elcbo)
# Return best variational posterior, its ELBO and SD
vp = self.iteration_history.get("vp")[idx_best]
elbo = self.iteration_history.get("elbo")[idx_best]
elbo_sd = self.iteration_history.get("elbo_sd")[idx_best]
vp.stats["stable"] = self.iteration_history.get("stable")[idx_best]
return vp, elbo, elbo_sd, idx_best
def save(self, file, overwrite=False):
"""Save the VBMC instance to a file.
.. note::
Complex attributes of a VBMC instance (such as the stored
``log_joint`` callable) may not behave correctly if they have been
saved and loaded by different minor versions of Python, due to
differing dependency versions. Basic (static) data should remain
legible across versions.
Parameters
----------
file : path-like
The file name or path to write to. Default file extension `.pkl`
will be added if no extension is specified.
overwrite : bool
Whether to allow overwriting existing files. Default `False`.
Raises
------
FileExistsError
If the file already exists and ``overwrite`` is `False`.
OSError
If the file cannot be opened for other reasons (e.g., the directory
is not found, the disk is full, etc.).
"""
filepath = Path(file)
if filepath.suffix == "":
filepath = filepath.with_suffix(".pkl")
if overwrite:
mode = "wb"
else:
mode = "xb"
with open(filepath, mode=mode) as f:
dill.dump(self, f, recurse=True)
@classmethod
def load(
cls, file, new_options=None, iteration=None, set_random_state=False
):
"""Load a VBMC instance from a file.
.. note::
Complex attributes of a VBMC instance (such as the stored
``log_joint`` callable) may not behave correctly if they have been
saved and loaded by different minor versions of Python, due to
differing dependency versions. Basic (static) data should remain
legible across versions.
Parameters
----------
file : path-like
The file name or path to write to.
new_options : dict or None
A dictionary of options to change when loading the stored VBMC
instance. Useful, for example, to continue a previous run with a
larger budget of function evaluations and/or iterations. See the
documentation on PyVBMC's options for more details.
iteration : int or None
The iteration at which to initialize the stored VBMC instance.
Default is `None`, meaning initialize to the last recorded iteration.
set_random_state : bool
Whether to set the global random state to the stored random state
of the VBMC object, for reprodicibility. Default `False`.
Returns
-------
vbmc : VBMC
The loaded VBMC instance, initialized at the specified iteration.
Raises
------
ValueError
If the specified ``iteration`` is less than zero or larger than the
last stored iteration.
OSError
If the file cannot be found, or cannot be opened for other reasons.
"""
filepath = Path(file)
if filepath.suffix == "":
filepath = filepath.with_suffix(".pkl")
with open(filepath, mode="rb") as f:
vbmc = dill.load(f)
# Set/check iteration
if iteration is None:
iteration = vbmc.iteration
else:
if not (0 <= iteration <= vbmc.iteration):
raise ValueError(
f"Specified iteration ({iteration}) should be >= 0 and <= last stored iteration ({vbmc.iteration})."
)
# Set states for VBMC right before the specified iteration
if hasattr(vbmc, "iteration_history"):
# Handle instances saved before running vbmc.optimize()
for attr in ["gp", "vp", "function_logger", "optim_state"]:
if vbmc.iteration_history[attr] is not None:
setattr(
vbmc, attr, vbmc.iteration_history[attr][iteration]
)
if vbmc.optim_state.get("hyp_dict") is not None:
vbmc.hyp_dict = vbmc.optim_state["hyp_dict"]
vbmc.iteration = iteration
for k, v in vbmc.iteration_history.items():
if v is not None:
vbmc.iteration_history[k] = vbmc.iteration_history[k][
: (iteration + 1)
]
# Update with new options (e.g. higher number of max iterations)
if new_options is not None:
vbmc.options.is_initialized = False
vbmc.options.update(new_options)
vbmc.options.is_initialized = True
# Optionally set random state for reproducibility
if set_random_state:
random_state = vbmc.iteration_history["random_state"][iteration]
np.random.set_state(random_state)
return vbmc
def _create_result_dict(
self, idx_best: int, termination_message: str, success_flag: bool
):
"""
Private method to create the result dict.
"""
output = {}
output["function"] = str(self.function_logger.fun)
if np.all(np.isinf(self.optim_state["lb_tran"])) and np.all(
np.isinf(self.optim_state["ub_tran"])
):
output["problem_type"] = "unconstrained"
else:
output["problem_type"] = "bounded"
output["iterations"] = self.optim_state["iter"]
output["func_count"] = self.function_logger.func_count
output["best_iter"] = idx_best
output["train_set_size"] = self.iteration_history["n_eff"][idx_best]
output["components"] = self.vp.K
output["r_index"] = self.iteration_history["r_index"][idx_best]
if self.iteration_history["stable"][idx_best]:
output["convergence_status"] = "probable"
else:
output["convergence_status"] = "no"
output["overhead"] = np.NaN
output["rng_state"] = "rng"
output["algorithm"] = "Variational Bayesian Monte Carlo"
try:
__version__ = version("pyvbmc")
except PackageNotFoundError:
# package is not installed
__version__ = None
logger = logging.getLogger("VBMC")
logger.warning("Cannot read version number from package metadata.")
output["version"] = __version__
output["message"] = termination_message
output["elbo"] = self.vp.stats["elbo"]
output["elbo_sd"] = self.vp.stats["elbo_sd"]
output["success_flag"] = success_flag
return output
def _log_column_headers(self):
"""
Private method to log column headers for the iteration log.
"""
# We only want to log the column headers once when writing to a file,
# but we re-write them to the stream (stdout) when plotting.
if self.optim_state.get("iter") > 0:
logger = self.logger.stream_only
else:
logger = self.logger
if self.optim_state["cache_active"]:
logger.info(
" Iteration f-count/f-cache Mean[ELBO] Std[ELBO] "
"sKL-iter[q] K[q] Convergence Action"
)
else:
if (
self.optim_state["uncertainty_handling_level"] > 0
and self.options.get("max_repeated_observations") > 0
):
logger.info(
" Iteration f-count (x-count) Mean[ELBO] Std[ELBO]"
" sKL-iter[q] K[q] Convergence Action"
)
else:
logger.info(
" Iteration f-count Mean[ELBO] Std[ELBO] "
"sKL-iter[q] K[q] Convergence Action"
)
def _setup_logging_display_format(self):
"""
Private method to set up the display format for logging the iterations.
"""
if self.optim_state["cache_active"]:
display_format = " {:5.0f} {:5.0f} /{:5.0f} {:12.2f} "
display_format += (
"{:12.2f} {:12.2f} {:4.0f} {:10.3g} {}"
)
else:
if (
self.optim_state["uncertainty_handling_level"] > 0
and self.options.get("max_repeated_observations") > 0
):
display_format = " {:5.0f} {:5.0f} {:5.0f} {:12.2f} "
display_format += (
"{:12.2f} {:12.2f} {:4.0f} {:10.3g} "
)
display_format += "{}"
else:
display_format = " {:5.0f} {:5.0f} {:12.2f} {:12.2f} "
display_format += "{:12.2f} {:4.0f} {:10.3g} {}"
return display_format
def _init_logger(self, substring=""):
"""
Private method to initialize the logging object.
Parameters
----------
substring : str
A substring to append to the logger name (used to create separate
logging objects for initialization and optimization, in case
options change in between). Default "" (empty string).
Returns
-------
logger : logging.Logger
The main logging interface.
"""
# set up VBMC logger
logger = logging.getLogger("VBMC" + substring)
logger.setLevel(logging.INFO)
if self.options.get("display") == "off":
logger.setLevel(logging.WARN)
elif self.options.get("display") == "iter":
logger.setLevel(logging.INFO)
elif self.options.get("display") == "full":
logger.setLevel(logging.DEBUG)
# Add a special logger for sending messages only to the default stream:
logger.stream_only = logging.getLogger("VBMC.stream_only")
# Options and special handling for writing to a file:
# If logging for the first time, get write mode from user options
# (default "a" for append)
if substring == "_init":
log_file_mode = self.options.get("log_file_mode", "a")
# On subsequent writes, switch to append mode:
else:
log_file_mode = "a"
# Avoid duplicating a handler for the same log file
# (remove duplicates, re-add below)
for handler in logger.handlers:
log_file_name = self.options.get("log_file_name")
if (
log_file_name is not None
and handler.baseFilename == os.path.abspath(log_file_name)
):
logger.removeHandler(handler)
if self.options.get("log_file_name") and self.options.get(
"log_file_level"
):
file_handler = logging.FileHandler(
filename=self.options["log_file_name"], mode=log_file_mode
)
# Set file logger level according to string or logging level:
log_file_level = self.options.get("log_file_level", logging.INFO)
if log_file_level == "off":
file_handler.setLevel(logging.WARN)
elif log_file_level == "iter":
file_handler.setLevel(logging.INFO)
elif log_file_level == "full":
file_handler.setLevel(logging.DEBUG)
elif log_file_level in [0, 10, 20, 30, 40, 50]:
file_handler.setLevel(log_file_level)
else:
raise ValueError(
"Log file logging level is not a recognized"
+ "string or logging level."
)
# Add a filter to ignore messages sent to logger.stream_only:
def log_file_filter(record):
return record.name != "VBMC.stream_only"
file_handler.addFilter(log_file_filter)
logger.addHandler(file_handler)
return logger
def _init_log_joint(self, log_density, prior, log_prior, sample_prior):
# Initialize log-joint
log_likelihood = None
if (
prior is not None
or log_prior is not None
or sample_prior is not None
):
prior = convert_to_prior(prior, log_prior, sample_prior, self.D)
if prior.D != self.D:
raise ValueError(
f"Dimension of `prior` ({prior.D}) does not match dimension of model ({self.D})."
)
if prior is not None and prior.log_pdf is not None:
# Combine log-prior and log-likelihood:
log_prior = prior.log_pdf
log_likelihood = log_density
if self.optim_state["uncertainty_handling_level"] == 2:
def log_joint(theta):
ll, noise_est = log_likelihood(theta)
return ll + log_prior(theta), noise_est
else:
def log_joint(theta):
return log_likelihood(theta) + log_prior(theta)
else:
# Otherwise just use provided log-joint
log_joint = log_density
return log_joint, log_likelihood, prior
def __str__(self):
"""Construct a string summary."""
gp = getattr(getattr(self, "vp", None), "gp", None)
if gp is not None:
gp_str = f"gpyreg.{gp}"
else:
gp_str = "None"
return "VBMC:" + indent(
f"""
dimension = {self.D},
x0: {summarize(self.parameter_transformer.inverse(self.x0))},
lower bounds: {summarize(self.lower_bounds)},
upper bounds: {summarize(self.upper_bounds)},
plausible lower bounds: {summarize(self.plausible_lower_bounds)},
plausible upper bounds: {summarize(self.plausible_upper_bounds)},
log-density = {getattr(self, "log_likelihood", self.log_joint)},
log-prior = {getattr(self, "log_prior", None)},
prior sampler = {getattr(self, "sample_prior", None)},
variational posterior = {str(getattr(self, "vp", None))},
Gaussian process = {gp_str},
user options = {str(self.options)}""",
" ",
)
def __repr__(self, arr_size_thresh=10, expand=False):
"""Construct a detailed string summary.
Parameters
----------
arr_size_thresh : float, optional
If ``obj`` is an array whose product of dimensions is less than
``arr_size_thresh``, print the full array. Otherwise print only the
shape. Default `10`.
expand : bool, optional
If ``expand`` is `False`, then describe any complex child
attributes of the object by their name and memory location.
Otherwise, recursively expand the child attributes into their own
representations. Default `False`.
Returns
-------
string : str
The string representation of ``self``.
"""
return full_repr(
self,
"VBMC",
order=[
"D",
"x0",
"lower_bounds",
"upper_bounds",
"plausible_lower_bounds",
"plausible_upper_bounds",
"log_joint",
"log_prior",
"sample_prior",
"vp",
"K",
"gp",
"parameter_transformer",
"logger",
"logging_action",
"optim_state",
"options",
],
expand=expand,
arr_size_thresh=arr_size_thresh,
exclude=["random_state"],
)