Source code for pyvbmc.acquisition_functions.abstract_acq_fcn

import sys
from abc import ABC, abstractmethod

import gpyreg as gpr
import numpy as np

from pyvbmc.function_logger import FunctionLogger
from pyvbmc.parameter_transformer import ParameterTransformer
from pyvbmc.variational_posterior import VariationalPosterior


[docs] class AbstractAcqFcn(ABC): """ Abstract acquisition function for VBMC. """ def __init__(self): self.acq_info = {} self.acq_info["compute_var_log_joint"] = False # Whether the function value is in log space self.acq_info["log_flag"] = False
[docs] def get_info(self): """ Return a dict with information about the acquisition function. Returns ------- acq_info : dict A dict containing information about the acquisition function. """ return self.acq_info
[docs] def __call__( self, Xs: np.ndarray, gp: gpr.GP, vp: VariationalPosterior, function_logger: FunctionLogger, optim_state: dict, ): """ Calculate the acquisition function for the given inputs. Parameters ---------- Xs : np.ndarray Input points. gp : gpr.GP The GaussianProcess of the VBMC instance this function is called from. vp : VariationalPosterior The VariationalPosterior of the VBMC instance this function is called from. function_logger : FunctionLogger The FunctionLogger of the VBMC instance this function is called from. optim_state : dict The optim_state of the VBMC instance this function is called from. Returns ------- acq : np.ndarray The output of the acquisition function, shape ``(N,)`` where ``N`` is the number of input points. """ if Xs.ndim == 1: Xs = Xs[None, :] # Map integer inputs Xs = self._real2int( Xs, vp.parameter_transformer, optim_state.get("integer_vars") ) # Compute GP posterior predictive mean and variance # GP mean and variance for each hyperparameter sample f_mu, f_s2 = gp.predict(x_star=Xs, separate_samples=True) # Compute total variance Ns = f_mu.shape[1] f_bar = np.sum(f_mu, axis=1, keepdims=True) / Ns # Mean across samples var_bar = ( np.sum(f_s2, axis=1, keepdims=True) / Ns ) # Average variance across samples # Sample variance if Ns > 1: var_f = np.sum((f_mu - f_bar) ** 2, axis=1, keepdims=True) / ( Ns - 1 ) else: var_f = 0 f_bar = np.ravel(f_bar) var_tot = np.ravel(var_f + var_bar) # Total variance # Compute acquisition function acq = self._compute_acquisition_function( Xs, vp, gp, function_logger, optim_state, f_mu, f_s2, f_bar, var_tot, ) # Regularization: penalize points where GP uncertainty # is below threshold if optim_state.get("variance_regularized_acq_fcn"): # Try not to go below this variance tol_var = optim_state.get("tol_gp_var") idx_gp_uncertainty = var_tot < tol_var if np.any(idx_gp_uncertainty): if self.acq_info.get("log_flag"): acq[idx_gp_uncertainty] += ( tol_var / var_tot[idx_gp_uncertainty] - 1 ) else: acq[idx_gp_uncertainty] *= np.exp( -(tol_var / var_tot[idx_gp_uncertainty] - 1) ) realmax = sys.float_info.max acq = np.maximum(acq, -realmax) # Hard bound checking: discard points too close to bounds X_orig = vp.parameter_transformer.inverse(Xs) idx_bounds = np.logical_or( np.any(X_orig < optim_state.get("lb_eps_orig"), axis=1), np.any(X_orig > optim_state.get("ub_eps_orig"), axis=1), ) acq[idx_bounds] = np.Inf # Re-shape to 1-D, if necessary (to avoid errors in cma.fmin) if acq.ndim > 1: if acq.shape[0] != acq.size and acq.shape[1] != acq.size: raise ValueError( "Acquisition function should return a 1-D result (or a 2-D result which has size 1 along at least one axis)." ) acq = acq.reshape(-1) return acq
@abstractmethod def _compute_acquisition_function( self, Xs: np.ndarray, vp: VariationalPosterior, gp: gpr.GP, function_logger: FunctionLogger, optim_state: dict, f_mu: np.ndarray, f_s2: np.ndarray, f_bar: np.ndarray, var_tot: np.ndarray, ): """ Abstract method that must be implemented in each subclass. It computes the value of the acquisition function. """ @staticmethod def _real2int( X: np.ndarray, parameter_transformer: ParameterTransformer, integer_vars: np.ndarray, ): """ Convert to integer-valued representation. Parameters ---------- X : np.ndarray The points to be converted. parameter_transformer : ParameterTransformer The appropriate ParameterTransformer to convert between the spaces. integer_vars : np.ndarray A mask to determine which dimensions are integer vars. """ if np.any(integer_vars): X_temp = parameter_transformer.inverse(X) X_temp[:, integer_vars] = np.around(X_temp[:, integer_vars]) X_temp = parameter_transformer(X_temp) X[:, integer_vars] = X_temp[:, integer_vars] return X @staticmethod def _sq_dist(a: np.array, b: np.array): """ Compute matrix of all pairwise squared distances between two sets of vectors, stored in the columns of the two matrices `a` and `b`. Parameters ---------- a : np.array, shape (n, D) First set of vectors. b : np.array, shape (m, D) Second set of vectors. Returns ------- c: np.array, shape(n, m) The matrix of all pairwise squared distances. """ n = a.shape[0] m = b.shape[0] mu = (m / (n + m)) * np.mean(b, axis=0) + (n / (n + m)) * np.mean( a, axis=0 ) a = a - mu b = b - mu c = np.sum(a * a, axis=1, keepdims=True) + ( np.sum(b * b, axis=1, keepdims=True).T - (2 * a @ b.T) ) return np.maximum(c, 0) def _estimate_observation_noise( self, Xs: np.ndarray, gp: gpr.GP, optim_state: dict ): """ Estimate observation noise at test points from nearest neighbor. Parameters ---------- Xs : np.ndarray The test points. gp : gpr.GP The GaussianProcess of the VBMC instance this function is called from. optim_state : dict The optim_state of the VBMC instance this function is called from. Returns ------- sn2 : np.ndarray The estimated observation noise. """ # unravel_index as the indicies are 1D otherwise pos = np.argmin( self._sq_dist( Xs / optim_state.get("gp_length_scale"), gp.temporary_data.get("X_rescaled"), ), axis=1, ) sn2 = gp.temporary_data.get("sn2_new")[pos] return sn2