Source code for pyvbmc.function_logger.function_logger

from copy import deepcopy
from textwrap import indent

import numpy as np

from pyvbmc.formatting import full_repr
from pyvbmc.parameter_transformer import ParameterTransformer
from pyvbmc.timer import Timer


[docs] class FunctionLogger: """ Class that evaluates a function and caches its values. Parameters ---------- fun : callable The function to be logged. `fun` must take a vector input and return a scalar value and, optionally, the (estimated) SD of the returned value (if the function fun is stochastic). D : int The number of dimensions that the function takes as input. noise_flag : bool Whether the function fun is stochastic or not. uncertainty_handling_level : {0, 1, 2} The uncertainty handling level which can be one of (0: none; 1: unknown noise level; 2: user-provided noise). cache_size : int, optional The initial size of caching table (default 500). parameter_transformer : ParameterTransformer, optional A ParameterTransformer is required to transform the parameters between constrained and unconstrained space, by default None. """ def __init__( self, fun, D: int, noise_flag: bool, uncertainty_handling_level: int, cache_size: int = 500, parameter_transformer: ParameterTransformer = None, ): self.fun = fun self.D: int = D self.noise_flag: bool = noise_flag self.uncertainty_handling_level: int = uncertainty_handling_level self.transform_parameters = parameter_transformer is not None self.parameter_transformer = parameter_transformer self.func_count: int = 0 self.cache_count: int = 0 self.X_orig = np.full([cache_size, self.D], np.nan) self.y_orig = np.full([cache_size, 1], np.nan) self.X = np.full([cache_size, self.D], np.nan) self.y = np.full([cache_size, 1], np.nan) self.y_max = np.nan self.n_evals = np.full([cache_size, 1], 0) if self.noise_flag: self.S = np.full([cache_size, 1], np.nan) self.Xn: int = -1 # Last filled entry # Use 1D array since this is a boolean mask. self.X_flag = np.full((cache_size,), False, dtype=bool) self.y_max = float("-Inf") self.fun_eval_time = np.full([cache_size, 1], np.nan) self.total_fun_eval_time = 0
[docs] def __call__(self, x: np.ndarray): """ Evaluates the function FUN at x and caches values. Parameters ---------- x : np.ndarray The point at which the function will be evaluated. The shape of x should be (1, D) or (D,). Returns ------- f_val : float The result of the evaluation. SD : float The (estimated) SD of the returned value. idx : int The index of the last updated entry. Raises ------ ValueError If the input cannot be coerced to 1-D. ValueError Raise if the function value is not a finite real-valued scalar. ValueError Raise if the (estimated) SD (second function output) is not a finite, positive real-valued scalar. """ timer = Timer() x_shape_orig = x.shape if x.ndim > 1: x = x.squeeze() if x.ndim == 0: x = np.atleast_1d(x) if x.size != x.shape[0]: raise ValueError( f"Input should be one-dimensional but has shape {x_shape_orig}." ) # Convert back to original space if self.transform_parameters: x_orig = self.parameter_transformer.inverse( np.reshape(x, (1, x.shape[0])) )[0] else: x_orig = x timer.start_timer("fun_time") try: if self.noise_flag and self.uncertainty_handling_level == 2: f_val_orig, f_sd = self.fun(x_orig) else: f_val_orig = self.fun(x_orig) if self.noise_flag: f_sd = 1 else: f_sd = None if isinstance(f_val_orig, np.ndarray): # f_val_orig can only be an array with size 1 f_val_orig = f_val_orig.item() except Exception as err: err.args += ( "FunctionLogger:FuncError " + "Error in executing the logged function" + "with input: " + str(x_orig), ) raise timer.stop_timer("fun_time") # if f_val is an array with only one element, extract that element if not np.isscalar(f_val_orig) and np.size(f_val_orig) == 1: f_val_orig = np.array(f_val_orig).flat[0] # Check function value if ( not np.isscalar(f_val_orig) or not np.isfinite(f_val_orig) or not np.isreal(f_val_orig) ): error_message = """FunctionLogger:InvalidFuncValue: The returned function value must be a finite real-valued scalar (returned value {})""" raise ValueError(error_message.format(str(f_val_orig))) # Check returned function SD if self.noise_flag and ( not np.isscalar(f_sd) or not np.isfinite(f_sd) or not np.isreal(f_sd) or f_sd <= 0.0 ): error_message = """FunctionLogger:InvalidNoiseValue The returned estimated SD (second function output) must be a finite, positive real-valued scalar (returned SD: {})""" raise ValueError(error_message.format(str(f_sd))) # record timer stats funtime = timer.get_duration("fun_time") self.func_count += 1 f_val, idx = self._record(x_orig, x, f_val_orig, f_sd, funtime) # optimstate.N = self.Xn # optimstate.N_eff = np.sum(self.n_evals[self.X_flag]) # optimState.totalfunevaltime = optimState.totalfunevaltime + t; return f_val, f_sd, idx
[docs] def add( self, x: np.ndarray, f_val_orig: float, f_sd: float = None, fun_eval_time=np.nan, ): """ Add an previously evaluated function sample to the function cache. Parameters ---------- x : np.ndarray The point at which the function has been evaluated. The shape of x should be (1, D) or (D,). f_val_orig : float The result of the evaluation of the function. f_sd : float, optional The (estimated) SD of the returned value (if heteroskedastic noise handling is on) of the evaluation of the function, by default None. fun_eval_time : float The duration of the time it took to evaluate the function, by default np.nan. Returns ------- f_val : float The result of the evaluation. SD : float The (estimated) SD of the returned value. idx : int The index of the last updated entry. Raises ------ ValueError If the input cannot be coerced to 1-D. ValueError Raise if the function value is not a finite real-valued scalar. ValueError Raise if the (estimated) SD (second function output) is not a finite, positive real-valued scalar. """ x_shape_orig = x.shape if x.ndim > 1: x = x.squeeze() if x.ndim == 0: x = np.atleast_1d(x) if x.size != x.shape[0]: raise ValueError( f"Input should be one-dimensional but has shape {x_shape_orig}." ) # Convert back to original space if self.transform_parameters: x_orig = self.parameter_transformer.inverse( np.reshape(x, (1, x.shape[0])) )[0] else: x_orig = x if self.noise_flag: if f_sd is None: f_sd = 1 else: f_sd = None # Check function value if ( not np.isscalar(f_val_orig) or not np.isfinite(f_val_orig) or not np.isreal(f_val_orig) ): error_message = """FunctionLogger:InvalidFuncValue: The returned function value must be a finite real-valued scalar (returned value {})""" raise ValueError(error_message.format(str(f_val_orig))) # Check returned function SD if self.noise_flag and ( not np.isscalar(f_sd) or not np.isfinite(f_sd) or not np.isreal(f_sd) or f_sd <= 0.0 ): error_message = """FunctionLogger:InvalidNoiseValue The returned estimated SD (second function output) must be a finite, positive real-valued scalar (returned SD:{})""" raise ValueError(error_message.format(str(f_sd))) self.cache_count += 1 f_val, idx = self._record(x_orig, x, f_val_orig, f_sd, fun_eval_time) return f_val, f_sd, idx
[docs] def finalize(self): """ Remove unused caching entries. """ self.X_orig = self.X_orig[: self.Xn + 1] self.y_orig = self.y_orig[: self.Xn + 1] # in the original matlab version X and Y get deleted self.X = self.X[: self.Xn + 1] self.y = self.y[: self.Xn + 1] if self.noise_flag: self.S = self.S[: self.Xn + 1] self.X_flag = self.X_flag[: self.Xn + 1] self.fun_eval_time = self.fun_eval_time[: self.Xn + 1]
def _expand_arrays(self, resize_amount: int = None): """ A private function to extend the rows of the object attribute arrays. Parameters ---------- resize_amount : int, optional The number of additional rows, by default expand current table by 50%. """ if resize_amount is None: resize_amount = int(np.max((np.ceil(self.Xn / 2), 1))) self.X_orig = np.append( self.X_orig, np.full([resize_amount, self.D], np.nan), axis=0 ) self.y_orig = np.append( self.y_orig, np.full([resize_amount, 1], np.nan), axis=0 ) self.X = np.append( self.X, np.full([resize_amount, self.D], np.nan), axis=0 ) self.y = np.append(self.y, np.full([resize_amount, 1], np.nan), axis=0) if self.noise_flag: self.S = np.append( self.S, np.full([resize_amount, 1], np.nan), axis=0 ) self.X_flag = np.append( self.X_flag, np.full((resize_amount,), False, dtype=bool) ) self.fun_eval_time = np.append( self.fun_eval_time, np.full([resize_amount, 1], np.nan), axis=0 ) self.n_evals = np.append( self.n_evals, np.full([resize_amount, 1], 0), axis=0 ) def _record( self, x_orig: float, x: float, f_val_orig: float, f_sd: float, fun_eval_time: float, ): """ A private method to save function values to class attributes. Parameters ---------- x_orig : float The point at which the function has been evaluated (in original space). x : float The point at which the function has been evaluated (in transformed space). f_val_orig : float The result of the evaluation. f_sd : float The (estimated) SD of the returned value (if heteroskedastic noise handling is on). fun_eval_time : float The duration of the time it took to evaluate the function. Returns ------- f_val : float The result of the evaluation. idx : int The index of the last updated entry. Raises ------ ValueError Raise if there is more than one match for a duplicate entry. """ duplicate_flag = (self.X == x).all(axis=1) if np.any(duplicate_flag): if np.sum(duplicate_flag) > 1: raise ValueError("More than one match for duplicate entry.") idx = np.argwhere(duplicate_flag)[0, 0] N = self.n_evals[idx] if f_sd is not None: tau_n = 1 / self.S[idx] ** 2 tau_1 = 1 / f_sd**2 self.y_orig[idx] = ( tau_n * self.y_orig[idx] + tau_1 * f_val_orig ) / (tau_n + tau_1) self.S[idx] = 1 / np.sqrt(tau_n + tau_1) else: self.y_orig[idx] = (N * self.y_orig[idx] + f_val_orig) / ( N + 1 ) f_val = self.y_orig[idx] if self.transform_parameters: f_val += self.parameter_transformer.log_abs_det_jacobian(x) self.y[idx] = f_val self.fun_eval_time[idx] = ( N * self.fun_eval_time[idx] + fun_eval_time ) / (N + 1) self.n_evals[idx] += 1 return f_val, idx else: self.Xn += 1 if self.Xn > self.X_orig.shape[0] - 1: self._expand_arrays() # record function time if not np.isnan(fun_eval_time): self.fun_eval_time[self.Xn] = fun_eval_time self.total_fun_eval_time += fun_eval_time self.X_orig[self.Xn] = x_orig self.X[self.Xn] = x self.y_orig[self.Xn] = f_val_orig f_val = f_val_orig if self.transform_parameters: f_val += self.parameter_transformer.log_abs_det_jacobian( np.reshape(x, (1, x.shape[0])) )[0] self.y[self.Xn] = f_val if f_sd is not None: self.S[self.Xn] = f_sd self.X_flag[self.Xn] = True self.n_evals[self.Xn] += 1 self.y_max = np.nanmax(self.y[self.X_flag]) return f_val, self.Xn def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) # Avoid infinite recursion in deepcopy memo[id(self)] = result # Copy class properties: for k, v in self.__dict__.items(): if k == "fun": # Avoid deepcopy of log-joint function # (interferes with benchmark logging) setattr(result, k, v) else: setattr(result, k, deepcopy(v, memo)) return result def __str__(self, arr_size_thresh=10): """Print a string summary.""" return "FunctionLogger:" + indent( f""" function = {self.fun}, dimension = {self.D}, noisy = {self.noise_flag}, num. evaluations = {self.func_count}, y max = {self.y_max}, fun. eval. time = {self.total_fun_eval_time}""", " ", ) def __repr__(self, arr_size_thresh=10, expand=False): """Construct a detailed string summary. Parameters ---------- arr_size_thresh : float, optional If ``obj`` is an array whose product of dimensions is less than ``arr_size_thresh``, print the full array. Otherwise print only the shape. Default `10`. expand : bool, optional If ``expand`` is `False`, then describe any complex child attributes of the object by their name and memory location. Otherwise, recursively expand the child attributes into their own representations. Default `False`. Returns ------- string : str The string representation of ``self``. """ return full_repr( self, "FunctionLogger", expand=expand, arr_size_thresh=arr_size_thresh, )