import math
import gpyreg as gpr
import numpy as np
from pyvbmc.function_logger import FunctionLogger
from pyvbmc.stats import get_hpd
from .iteration_history import IterationHistory
from .options import Options
[docs]
def train_gp(
hyp_dict: dict,
optim_state: dict,
function_logger: FunctionLogger,
iteration_history: IterationHistory,
options: Options,
plb_tran: np.ndarray,
pub_tran: np.ndarray,
):
"""
Train Gaussian process model.
Parameters
==========
hyp_dict : dict
Hyperparameter summary statistics dictionary.
If it does not contain the appropriate keys they will be added
automatically.
optim_state : dict
Optimization state from the VBMC instance we are calling this from.
function_logger : FunctionLogger
Function logger from the VBMC instance which we are calling this from.
iteration_history : IterationHistory
Iteration history from the VBMC instance we are calling this from.
options : Options
Options from the VBMC instance we are calling this from.
plb_tran : ndarray, shape (1, D)
Transformed lower plausible bounds, used to set GP hyperparameters.
pub_tran : ndarray, shape (1, D)
Transformed upper plausible bounds, used to set GP hyperparameters.
Returns
=======
gp : GP
The trained GP.
gp_s_N : int
The number of samples for fitting.
sn2_hpd : float
An estimate of the GP noise variance at high posterior density.
hyp_dict : dict
The updated summary statistics.
"""
# Initialize hyp_dict if empty.
if "hyp" not in hyp_dict:
hyp_dict["hyp"] = None
if "warp" not in hyp_dict:
hyp_dict["warp"] = None
if "logp" not in hyp_dict:
hyp_dict["logp"] = None
if "full" not in hyp_dict:
hyp_dict["full"] = None
if "run_cov" not in hyp_dict:
hyp_dict["run_cov"] = None
# Get training dataset.
x_train, y_train, s2_train, t_train = _get_training_data(function_logger)
D = x_train.shape[1]
# Heuristic fitness shaping (unused even in MATLAB)
# if options.FitnessShaping
# [y_train,s2_train] = outputwarp_vbmc(X_train,y_train,s2_train,
# optimState,options);
# end
# Pick the mean function
mean_f = _meanfun_name_to_mean_function(optim_state["gp_mean_fun"])
# Pick the covariance function.
covariance_f = _cov_identifier_to_covariance_function(
optim_state["gp_cov_fun"]
)
# Pick the noise function.
const_add = optim_state["gp_noise_fun"][0] == 1
user_add = optim_state["gp_noise_fun"][1] == 1
user_scale = optim_state["gp_noise_fun"][1] == 2
rlod_add = optim_state["gp_noise_fun"][2] == 1
noise_f = gpr.noise_functions.GaussianNoise(
constant_add=const_add,
user_provided_add=user_add,
scale_user_provided=user_scale,
rectified_linear_output_dependent_add=rlod_add,
)
# Setup a GP.
gp = gpr.GP(D=D, covariance=covariance_f, mean=mean_f, noise=noise_f)
# Get number of samples and set priors and bounds.
gp, hyp0, gp_s_N = _gp_hyp(
optim_state, options, plb_tran, pub_tran, gp, x_train, y_train
)
# Initial GP hyperparameters.
if hyp_dict["hyp"] is None:
hyp_dict["hyp"] = hyp0.copy()
# Get GP training options.
gp_train = _get_gp_training_options(
optim_state, iteration_history, options, hyp_dict, gp_s_N
)
# In some cases the model can change so be careful.
if gp_train["widths"] is not None and np.size(
gp_train["widths"]
) != np.size(hyp0):
gp_train["widths"] = None
# Build starting points
# hyp0 = np.empty((0, np.size(hyp_dict["hyp"])))
hyp0 = np.empty((0, hyp_dict["hyp"].T.shape[0]))
if gp_train["init_N"] > 0 and optim_state["iter"] > 0:
# Be very careful with off-by-one errors compared to MATLAB in the
# range here.
for i in range(
math.ceil((np.size(iteration_history["gp"]) + 1) / 2) - 1,
np.size(iteration_history["gp"]),
):
hyp0 = np.concatenate(
(
hyp0,
iteration_history["gp"][i].get_hyperparameters(
as_array=True
),
)
)
N0 = hyp0.shape[0]
if N0 > gp_train["init_N"] / 2:
hyp0 = hyp0[
np.random.choice(
N0, math.ceil(gp_train["init_N"] / 2), replace=False
),
:,
]
hyp0 = np.concatenate((hyp0, np.atleast_2d(hyp_dict["hyp"])))
hyp0 = np.unique(hyp0, axis=0)
# In some cases the model can change so be careful.
if hyp0.shape[1] != np.size(gp.hyper_priors["mu"]):
hyp0 = None
if (
"hyp_vp" in hyp_dict
and hyp_dict["hyp_vp"] is not None
and gp_train["sampler"] == "npv"
):
hyp0 = hyp_dict["hyp_vp"]
# print(hyp0.shape)
hyp_dict["hyp"], _, res = gp.fit(
x_train, y_train, s2_train, hyp0=hyp0, options=gp_train
)
if res is not None:
# Pre-thinning GP hyperparameters
hyp_dict["full"] = res["samples"]
hyp_dict["logp"] = res["log_priors"]
# Missing port: currently not used since we do
# not support samplers other than slice sampling.
# if isfield(gpoutput,'hyp_vp')
# hypstruct.hyp_vp = gpoutput.hyp_vp;
# end
# if isfield(gpoutput,'stepsize')
# optimState.gp_mala_step_size = gpoutput.stepsize;
# gpoutput.stepsize
# end
# Update running average of GP hyperparameter covariance (coarse)
if hyp_dict["full"] is not None and hyp_dict["full"].shape[1] > 1:
hyp_cov = np.cov(hyp_dict["full"].T)
if hyp_dict["run_cov"] is None or options["hyp_run_weight"] == 0:
hyp_dict["run_cov"] = hyp_cov
else:
w = options["hyp_run_weight"] ** options["fun_evals_per_iter"]
hyp_dict["run_cov"] = (1 - w) * hyp_cov + w * hyp_dict["run_cov"]
else:
hyp_dict["run_cov"] = None
# Missing port: sample for GP for debug (not used)
# Estimate of GP noise around the top high posterior density region
# We don't modify optim_state to contain sn2_hpd here.
sn2_hpd = _estimate_noise(gp)
return gp, gp_s_N, sn2_hpd, hyp_dict
def _meanfun_name_to_mean_function(name: str):
"""
Transforms a mean function name to an instance of that mean function.
Parameters
==========
name : str
Name of the mean function.
Returns
=======
mean_f : object
An instance of the specified mean function.
Raises
------
ValueError
Raised when the mean function name is unknown.
"""
if name == "zero":
mean_f = gpr.mean_functions.ZeroMean()
elif name == "const":
mean_f = gpr.mean_functions.ConstantMean()
elif name == "negquad":
mean_f = gpr.mean_functions.NegativeQuadratic()
else:
raise ValueError("Unknown mean function!")
return mean_f
def _cov_identifier_to_covariance_function(identifier):
"""
Transforms a covariance function identifer to an instance of the
corresponding covariance function.
Parameters
==========
identifier : object
Either an integer, or a list such as [3, 3] where the first
number is the identifier and the further numbers are parameters
of the covariance function.
Returns
=======
cov_f : object
An instance of the specified covariance function.
Raises
------
ValueError
Raised when the covariance function identifier is unknown.
"""
if identifier == 1:
cov_f = gpr.covariance_functions.SquaredExponential()
elif identifier == 3:
cov_f = gpr.covariance_functions.Matern(5)
elif isinstance(identifier, list) and identifier[0] == 3:
cov_f = gpr.covariance_functions.Matern(identifier[1])
else:
raise ValueError("Unknown covariance function")
return cov_f
def _gp_hyp(
optim_state: dict,
options: Options,
plb_tran: np.ndarray,
pub_tran: np.ndarray,
gp: gpr.GP,
X: np.ndarray,
y: np.ndarray,
):
"""
Define bounds, priors and samples for GP hyperparameters.
Parameters
==========
optim_state : dict
Optimization state from the VBMC instance we are calling this from.
options : Options
Options from the VBMC instance we are calling this from.
plb_tran : ndarray, shape (1, D)
Transformed lower plausible bounds, used to set GP hyperparameters.
pub_tran : ndarray, shape (1, D)
Transformed upper plausible bounds, used to set GP hyperparameters.
gp : GP
Gaussian process for which we are making the bounds,
priors and so on.
X : ndarray, shape (N, D)
Training inputs.
y : ndarray, shape (N, 1)
Training targets.
Returns
=======
gp : GP
The GP with updates priors, bounds and so on.
hyp0 : ndarray, shape (hyp_N,)
Initial guess for the hyperparameters.
gp_s_N : int
The number of samples for GP fitting.
Raises
------
TypeError
Raised if the mean function is not supported by gpyreg.
"""
# Get high posterior density dataset.
hpd_X, hpd_y, _, _ = get_hpd(X, y, options["hpd_frac"])
D = hpd_X.shape[1]
# s2 = None
## Set GP hyperparameter defaults for VBMC.
cov_bounds_info = gp.covariance.get_bounds_info(hpd_X, hpd_y)
mean_bounds_info = gp.mean.get_bounds_info(hpd_X, hpd_y)
noise_bounds_info = gp.noise.get_bounds_info(hpd_X, hpd_y)
# Missing port: output warping hyperparameters not implemented
cov_x0 = cov_bounds_info["x0"]
mean_x0 = mean_bounds_info["x0"]
noise_x0 = noise_bounds_info["x0"]
min_noise = options["tol_gp_noise"]
noise_mult = None
if optim_state["uncertainty_handling_level"] == 0:
if options["noise_size"] != []:
noise_size = max(options["noise_size"], min_noise)
else:
noise_size = min_noise
noise_std = 0.5
elif optim_state["uncertainty_handling_level"] == 1:
# This branch is not used and tested at the moment.
if options["noise_size"] != []:
noise_mult = max(options["noise_size"], min_noise)
noise_mult_std = np.log(10) / 2
else:
noise_mult = 1
noise_mult_std = np.log(10)
noise_size = min_noise
noise_std = np.log(10)
elif optim_state["uncertainty_handling_level"] == 2:
noise_size = min_noise
noise_std = 0.5
noise_x0[0] = np.log(noise_size)
hyp0 = np.concatenate([cov_x0, noise_x0, mean_x0])
# Missing port: output warping hyperparameters not implemented
## Change default bounds and set priors over hyperparameters.
bounds = gp.get_bounds()
if options["upper_gp_length_factor"] > 0:
# Max GP input length scale
bounds["covariance_log_lengthscale"] = (
-np.inf,
np.log(options["upper_gp_length_factor"] * (pub_tran - plb_tran)),
)
# Increase minimum noise.
bounds["noise_log_scale"] = (np.log(min_noise), np.inf)
# Missing port: we only implement the mean functions that gpyreg supports.
if isinstance(gp.mean, gpr.mean_functions.ZeroMean):
pass
elif isinstance(gp.mean, gpr.mean_functions.ConstantMean):
# Lower maximum constant mean
bounds["mean_const"] = (-np.inf, np.min(hpd_y))
elif isinstance(gp.mean, gpr.mean_functions.NegativeQuadratic):
if options["gp_quadratic_mean_bound"]:
delta_y = max(
options["tol_sd"],
min(D, np.max(hpd_y) - np.min(hpd_y)),
)
bounds["mean_const"] = (-np.inf, np.max(hpd_y) + delta_y)
else:
raise TypeError("The mean function is not supported by gpyreg.")
# Set lower bounds for GP's outputscale and lengthscale
if isinstance(gp.covariance, gpr.covariance_functions.SquaredExponential):
bounds["covariance_log_outputscale"] = (
cov_bounds_info["LB"][D],
np.nan,
)
bounds["covariance_log_lengthscale"] = (
cov_bounds_info["LB"][:D],
np.nan,
)
# These bounds are wider since cov_bounds_info is based on the
# high-posterior-density region as opposed to the full data
# (a smaller set leads to smaller, hence wider, lower bounds)
# Set priors over hyperparameters (might want to double-check this)
priors = gp.get_priors()
# Hyperprior over observation noise
priors["noise_log_scale"] = (
"student_t",
(np.log(noise_size), noise_std, 3),
)
if noise_mult is not None:
priors["noise_provided_log_multiplier"] = (
"student_t",
(np.log(noise_mult), noise_mult_std, 3),
)
# Missing port: hyperprior over mixture of quadratics mean function
# Change bounds and hyperprior over output-dependent noise modulation
# Note: currently this branch is not used.
if optim_state["gp_noise_fun"][2] == 1:
bounds["noise_rectified_log_multiplier"] = (
[np.min(np.min(y), np.max(y) - 20 * D), -np.inf],
[np.max(y) - 10 * D, np.inf],
)
# These two lines were commented out in MATLAB as well.
# If uncommented add them to the stuff below these two lines
# where we have np.nan
# hypprior.mu(Ncov+2) = max(y_hpd) - 10*D;
# hypprior.sigma(Ncov+2) = 1;
# Only set the first of the two parameters here.
priors["noise_rectified_log_multiplier"] = (
"student_t",
([np.nan, np.log(0.01)], [np.nan, np.log(10)], [np.nan, 3]),
)
# Missing port: priors and bounds for output warping hyperparameters
# (not used)
# VBMC used to have an empirical Bayes prior on some GP hyperparameters,
# such as input length scales, based on statistics of the GP training
# inputs. However, this approach could lead to instabilities. From the
# 2020 paper, we switched to a fixed prior based on the plausible bounds.
priors["covariance_log_lengthscale"] = (
"student_t",
(
np.log(options["gp_length_prior_mean"] * (pub_tran - plb_tran)),
options["gp_length_prior_std"],
3,
),
)
# Missing port: meanfun == 14 hyperprior case
# Missing port: output warping priors
## Number of GP hyperparameter samples.
stop_sampling = optim_state["stop_sampling"]
if stop_sampling == 0:
# Number of samples
gp_s_N = options["ns_gp_max"] / np.sqrt(optim_state["N"])
# Maximum sample cutoff
if optim_state["warmup"]:
gp_s_N = np.minimum(gp_s_N, options["ns_gp_max_warmup"])
else:
gp_s_N = np.minimum(gp_s_N, options["ns_gp_max_main"])
# Stop sampling after reaching max number of training points
if optim_state["N"] >= options["stable_gp_sampling"]:
stop_sampling = optim_state["N"]
# Stop sampling after reaching threshold of variational components
if optim_state["vp_K"] >= options["stable_gp_vp_k"]:
stop_sampling = optim_state["N"]
if stop_sampling > 0:
gp_s_N = options["stable_gp_samples"]
gp.set_bounds(bounds)
gp.set_priors(priors)
return gp, hyp0, round(gp_s_N)
def _get_gp_training_options(
optim_state: dict,
iteration_history: IterationHistory,
options: Options,
hyp_dict: dict,
gp_s_N: int,
):
"""
Get options for training GP hyperparameters.
Parameters
==========
optim_state : dict
Optimization state from the VBMC instance we are calling this from.
iteration_history : IterationHistory
Iteration history from the VBMC instance we are calling this from.
options : Options
Options from the VBMC instance we are calling this from.
hyp_dict : dict
Hyperparameter summary statistic dictionary.
gp_s_N : int
Number of samples for the GP fitting.
Returns
=======
gp_train : dic
A dictionary of GP training options.
Raises
------
ValueError
Raised if the MCMC sampler for GP hyperparameters is unknown.
"""
iteration = optim_state["iter"]
if iteration > 0:
r_index = iteration_history["r_index"][iteration - 1]
else:
r_index = np.inf
gp_train = {}
gp_train["thin"] = options["gp_sample_thin"] # MCMC thinning
gp_train["init_method"] = options["gp_train_init_method"]
gp_train["tol_opt"] = options["gp_tol_opt"]
gp_train["tol_opt_mcmc"] = options["gp_tol_opt_mcmc"]
gp_train["widths"] = None
# Get hyperparameter posterior covariance from previous iterations
hyp_cov = _get_hyp_cov(optim_state, iteration_history, options, hyp_dict)
# Setup MCMC sampler
if options["gp_hyp_sampler"] == "slicesample":
gp_train["sampler"] = "slicesample"
if options["gp_sample_widths"] > 0 and hyp_cov is not None:
width_mult = np.maximum(options["gp_sample_widths"], r_index)
hyp_widths = np.sqrt(np.diag(hyp_cov).T)
gp_train["widths"] = np.maximum(hyp_widths, 1e-3) * width_mult
elif options["gp_hyp_sampler"] == "npv":
gp_train["sampler"] = "npv"
elif options["gp_hyp_sampler"] == "mala":
gp_train["sampler"] = "mala"
if hyp_cov is not None:
gp_train["widths"] = np.sqrt(np.diag(hyp_cov).T)
if "gp_mala_step_size" in optim_state:
gp_train["step_size"] = optim_state["gp_mala_step_size"]
elif options["gp_hyp_sampler"] == "slicelite":
gp_train["sampler"] = "slicelite"
if options["gp_sample_widths"] > 0 and hyp_cov is not None:
width_mult = np.maximum(options["gp_sample_widths"], r_index)
hyp_widths = np.sqrt(np.diag(hyp_cov).T)
gp_train["widths"] = np.maximum(hyp_widths, 1e-3) * width_mult
elif options["gp_hyp_sampler"] == "splitsample":
gp_train["sampler"] = "splitsample"
if options["gp_sample_widths"] > 0 and hyp_cov is not None:
width_mult = np.maximum(options["gp_sample_widths"], r_index)
hyp_widths = np.sqrt(np.diag(hyp_cov).T)
gp_train["widths"] = np.maximum(hyp_widths, 1e-3) * width_mult
elif options["gp_hyp_sampler"] == "covsample":
if options["gp_sample_widths"] > 0 and hyp_cov is not None:
width_mult = np.maximum(options["gp_sample_widths"], r_index)
if np.all(np.isfinite(width_mult)) and np.all(
r_index < options["cov_sample_thresh"]
):
hyp_n = hyp_cov.shape[0]
gp_train["widths"] = (
hyp_cov + 1e-6 * np.eye(hyp_n)
) * width_mult**2
gp_train["sampler"] = "covsample"
gp_train["thin"] *= math.ceil(np.sqrt(hyp_n))
else:
hyp_widths = np.sqrt(np.diag(hyp_cov).T)
gp_train["widths"] = np.maximum(hyp_widths, 1e-3) * width_mult
gp_train["sampler"] = "slicesample"
else:
gp_train["sampler"] = "covsample"
elif options["gp_hyp_sampler"] == "laplace":
if optim_state["n_eff"] < 30:
gp_train["sampler"] = "slicesample"
if options["gp_sample_widths"] > 0 and hyp_cov is not None:
width_mult = np.maximum(options["gp_sample_widths"], r_index)
hyp_widths = np.sqrt(np.diag(hyp_cov).T)
gp_train["widths"] = np.maximum(hyp_widths, 1e-3) * width_mult
else:
gp_train["sampler"] = "laplace"
else:
raise ValueError("Unknown MCMC sampler for GP hyperparameters")
# N-dependent initial training points.
a = -(options["gp_train_n_init"] - options["gp_train_n_init_final"])
b = -3 * a
c = 3 * a
d = options["gp_train_n_init"]
x = (optim_state["n_eff"] - options["fun_eval_start"]) / (
min(options["max_fun_evals"], 1e3) - options["fun_eval_start"]
)
f = lambda x_: a * x_**3 + b * x**2 + c * x + d
init_N = max(round(f(x)), 9)
# Set other hyperparameter fitting parameters
if optim_state["recompute_var_post"]:
gp_train["burn"] = gp_train["thin"] * gp_s_N
gp_train["init_N"] = init_N
if gp_s_N > 0:
gp_train["opts_N"] = 1
else:
gp_train["opts_N"] = 2
else:
gp_train["burn"] = gp_train["thin"] * 3
if (
iteration > 1
and iteration_history["r_index"][iteration - 1]
< options["gp_retrain_threshold"]
):
gp_train["init_N"] = 0
if options["gp_hyp_sampler"] == "slicelite":
# TODO: gp_retrain_threshold is by default 1, so we get
# division by zero. what should the default be?
gp_train["burn"] = (
max(
1,
math.ceil(
gp_train["thin"]
* np.log(
iteration_history["r_index"][iteration - 1]
/ np.log(options["gp_retrain_threshold"])
)
),
)
* gp_s_N
)
gp_train["thin"] = 1
if gp_s_N > 0:
gp_train["opts_N"] = 0
else:
gp_train["opts_N"] = 1
else:
gp_train["init_N"] = init_N
if gp_s_N > 0:
gp_train["opts_N"] = 1
else:
gp_train["opts_N"] = 2
gp_train["n_samples"] = round(gp_s_N)
gp_train["burn"] = round(gp_train["burn"])
return gp_train
def _get_hyp_cov(
optim_state: dict,
iteration_history: IterationHistory,
options: Options,
hyp_dict: dict,
):
"""
Get hyperparameter posterior covariance.
Parameters
==========
optim_state : dict
Optimization state from the VBMC instance we are calling this from.
iteration_history : IterationHistory
Iteration history from the VBMC instance we are calling this from.
options : Options
Options from the VBMC instance we are calling this from.
hyp_dict : dict
Hyperparameter summary statistic dictionary.
Returns
=======
hyp_cov : ndarray, optional
The hyperparameter posterior covariance if it can be computed.
"""
if optim_state["iter"] > 0:
if options["weighted_hyp_cov"]:
w_list = []
hyp_list = []
w = 1
for i in range(0, optim_state["iter"]):
if i > 0:
# Be careful with off-by-ones compared to MATLAB here
diff_mult = max(
1,
np.log(
iteration_history["sKL"][optim_state["iter"] - i]
/ options["tol_skl"]
* options["fun_evals_per_iter"]
),
)
w *= options["hyp_run_weight"] ** (
options["fun_evals_per_iter"] * diff_mult
)
# Check if weight is getting too small.
if w < options["tol_cov_weight"]:
break
hyp = iteration_history["gp_hyp_full"][
optim_state["iter"] - 1 - i
]
hyp_n = hyp.shape[1]
if len(hyp_list) == 0 or np.shape(hyp_list)[2] == hyp.shape[0]:
hyp_list.append(hyp.T)
w_list.append(w * np.ones((hyp_n, 1)) / hyp_n)
w_list = np.concatenate(w_list)
hyp_list = np.concatenate(hyp_list)
# Normalize weights
w_list /= np.sum(w_list, axis=0)
# Weighted mean
mu_star = np.sum(hyp_list * w_list, axis=0)
# Weighted covariance matrix
hyp_n = np.shape(hyp_list)[1]
hyp_cov = np.zeros((hyp_n, hyp_n))
for j in range(0, np.shape(hyp_list)[0]):
hyp_cov += np.dot(
w_list[j],
np.dot((hyp_list[j] - mu_star).T, hyp_list[j] - mu_star),
)
hyp_cov /= 1 - np.sum(w_list**2)
return hyp_cov
return hyp_dict["run_cov"]
return None
def _get_training_data(function_logger: FunctionLogger):
"""
Get training data for building GP surrogate.
Parameters
==========
function_logger : FunctionLogger
Function logger from the VBMC instance which we are calling this from.
Returns
=======
x_train, ndarray
Training inputs.
y_train, ndarray
Training targets.
s2_train, ndarray, optional
Training data noise variance, if noise is used.
t_train, ndarray
Array of the times it took to evaluate the function on the training
data.
"""
x_train = function_logger.X[function_logger.X_flag, :]
y_train = function_logger.y[function_logger.X_flag]
if function_logger.noise_flag:
s2_train = function_logger.S[function_logger.X_flag] ** 2
else:
s2_train = None
# Missing port: noise_shaping
t_train = function_logger.fun_eval_time[function_logger.X_flag]
return x_train, y_train, s2_train, t_train
def _estimate_noise(gp: gpr.GP):
"""Estimate GP observation noise at high posterior density.
Parameters
==========
gp : GP
The GP for which to perform the estimate.
Returns
=======
est : float
The estimate of observation noise.
"""
hpd_top = 0.2
N, _ = gp.X.shape
# Subsample high posterior density dataset
# Sort by descending order, not ascending.
order = np.argsort(gp.y, axis=None)[::-1]
hpd_N = math.ceil(hpd_top * N)
hpd_X = gp.X[order[0:hpd_N]]
hpd_y = gp.y[order[0:hpd_N]]
if gp.s2 is not None:
hpd_s2 = gp.s2[order[0:hpd_N]]
else:
hpd_s2 = None
cov_N = gp.covariance.hyperparameter_count(gp.D)
noise_N = gp.noise.hyperparameter_count()
s_N = np.size(gp.posteriors)
sn2 = np.zeros((hpd_X.shape[0], s_N))
for s in range(0, s_N):
hyp = gp.posteriors[s].hyp[cov_N : cov_N + noise_N]
sn2[:, s : s + 1] = gp.noise.compute(hyp, hpd_X, hpd_y, hpd_s2)
return np.median(np.mean(sn2, axis=1))
def reupdate_gp(function_logger: FunctionLogger, gp: gpr.GP):
"""
Quick posterior reupdate of Gaussian process.
Parameters
==========
gp : GP
The GP to update.
function_logger : FunctionLogger
Function logger from the VBMC instance which we are calling this from.
Returns
=======
gp : GP
The updated Gaussian process.
"""
x_train, y_train, s2_train, t_train = _get_training_data(function_logger)
gp.X = x_train
gp.y = y_train
gp.s2 = s2_train
# Missing port: gp.t = t_train
gp.update(compute_posterior=True)
# Missing port: intmean part
return gp