Source code for pyvbmc.whitening.whitening
import copy
import gpyreg as gpr
import numpy as np
[docs]
def unscent_warp(fun, x, sigma):
r"""Compute the unscented transform of the warping function `fun`.
Parameters
----------
fun : function
A single-argument function which warps input points.
x : (n,D) or (D,) np.ndarray
The input mean for which to compute the unscented transform.
sigma : (n,D) or (D,) np.ndarray
The input matrix of standard deviations or scale parameters for which
to compute the unscented transform.
Returns
-------
x_warped_mean : (n,D) or (D,) np.ndarray
The unscented estimate of the mean.
x_warped_sigma : (n,D) np.ndarray
The unscented estimate of the standard deviation / scale parameters.
x_warped : (U,n,D) np.ndarray
The warped mean points at `x_warped[0, :, :]`, and the warped std.
simplex points, at `[1:, :, :]`. Here `U=2*D+1`.
Raises
------
ValueError
If the rows/columns of `x` and `sigma` cannot be coerced to match.
"""
x_shape_orig = x.shape
x = np.atleast_2d(x)
sigma = np.atleast_2d(sigma)
(N1, D) = x.shape
(N2, D2) = sigma.shape
N = np.max([N1, N2])
if N1 not in (1, N):
raise ValueError("Mismatch between rows of X and SIGMA.")
if N2 not in (1, N):
raise ValueError("Mismatch between rows of X and SIGMA.")
if D != D2:
raise ValueError("Mismatch between columns of X and SIGMA.")
if (N1 == 1) and (N > 1):
x = np.tile(x, [N, 1])
if (N2 == 1) and (N > 1):
sigma = np.tile(sigma, [N, 1])
U = 2 * D + 1
# For each dimension, collect the points at +- one std. deviation
# along that dimension, and the original points x
xx = np.tile(x, [U, 1, 1])
for d in range(D):
sigma_slice = np.sqrt(D) * sigma[:, d]
# xx already contains:
# xx[0, :, d] = x[d]
xx[2 * d + 1, :, d] = xx[2 * d + 1, :, d] + sigma_slice
xx[2 * d + 2, :, d] = xx[2 * d + 2, :, d] - sigma_slice
# Drop points into column to apply warping, then reshape back
x_warped = np.reshape(xx, [N * U, D])
x_warped = fun(x_warped)
x_warped = np.reshape(x_warped, [U, N, D])
# Estimate the mean and standard deviation of the warped points
# by the mean and std of these sigma-points
x_warped_mean = np.mean(x_warped, axis=0).reshape(x_shape_orig)
x_warped_sigma = np.std(x_warped, axis=0, ddof=1).reshape(x_shape_orig)
return x_warped_mean, x_warped_sigma, x_warped
[docs]
def warp_input(vp, optim_state, function_logger, options):
r"""Compute input warping of variables and update the cached points in
function_logger accordingly.
Currently supports only a whitening transformation: a rotation and
rescaling of the inference space such that the variational posterior
acheives unit diagonal covariance.
Parameters
----------
vp : VariationalPosterior
The current VP object for which to compute the warping.
optim_state : dict
The dictionary recording the current optimization state.
function_logger : FunctionLogger
The record including cached function values.
Returns
-------
parameter_transformer : ParameterTransformer
A ParameterTransformer object representing the new transformation
between original coordinates and inference space coordinates, with the
input warping applied.
optim_state : dict
An updated copy of the original optimization state dict.
function_logger : FunctionLogger
An updated copy of the original function logger.
warp_action : str
The type of warping performed ("rotoscaling" or "warp")
Raises
------
NotImplementedError
If `vbmc.options["warp_nonlinear"]` is set other than False.
"""
parameter_transformer = copy.deepcopy(vp.parameter_transformer)
optim_state = copy.deepcopy(optim_state)
function_logger = copy.deepcopy(function_logger)
if options.get("warp_nonlinear"):
raise NotImplementedError("Non-linear warping is not supported.")
if options.get("warp_rotoscaling"):
if options.get("warp_nonlinear"):
raise NotImplementedError("Non-linear warping is not supported.")
else:
# Get covariance matrix analytically
__, vp_cov = vp.moments(orig_flag=False, cov_flag=True)
delta = parameter_transformer.delta
R_mat = parameter_transformer.R_mat
scale = parameter_transformer.scale
if R_mat is None:
R_mat = np.eye(vp.D)
if scale is None:
scale = np.ones(vp.D)
vp_cov = R_mat @ np.diag(scale) @ vp_cov @ np.diag(scale) @ R_mat.T
vp_cov = np.diag(delta) @ vp_cov @ np.diag(delta)
# Remove low-correlation entries
if options["warp_roto_corr_thresh"] > 0:
vp_corr = vp_cov / np.sqrt(
np.outer(np.diag(vp_cov), np.diag(vp_cov))
)
mask_idx = np.abs(vp_corr) <= options["warp_roto_corr_thresh"]
vp_cov[mask_idx] = 0
# Regularization of covariance matrix towards diagonal
if (
type(options["warp_cov_reg"]) == float
or type(options["warp_cov_reg"]) == int
):
w_reg = options["warp_cov_reg"]
else:
w_reg = options.warp_cov_reg[optim_state["N"]]
w_reg = np.max([0, np.min([1, w_reg])])
vp_cov = (1 - w_reg) * vp_cov + w_reg * np.diag(np.diag(vp_cov))
# Compute whitening transform (rotoscaling)
U, s, __ = np.linalg.svd(vp_cov)
if np.linalg.det(U) < 0:
U[:, 0] = -U[:, 0]
scale = np.sqrt(s + np.finfo(np.float64).eps)
parameter_transformer.R_mat = U
parameter_transformer.scale = scale
# Update shift and scaling and plausible bounds:
parameter_transformer.mu = np.zeros(vp.D)
parameter_transformer.delta = np.ones(vp.D)
Nrnd = 100000
xx = (
np.random.rand(Nrnd, vp.D)
* (optim_state["pub_orig"] - optim_state["plb_orig"])
+ optim_state["plb_orig"]
)
yy = parameter_transformer(xx)
# Quantile-based estimate of plausible bounds
[plb_tran, pub_tran] = np.quantile(yy, [0.05, 0.95], axis=0)
delta_temp = pub_tran - plb_tran
plb_tran = plb_tran - delta_temp / 9
pub_tran = pub_tran + delta_temp / 9
optim_state["plb_tran"] = plb_tran.reshape((1, vp.D))
optim_state["pub_tran"] = pub_tran.reshape((1, vp.D))
# Temperature scaling
if optim_state.get("temperature"):
T = optim_state["temperature"]
else:
T = 1
# Adjust stored points after warping
X_flag = function_logger.X_flag
X_orig = function_logger.X_orig[X_flag, :]
y_orig = function_logger.y_orig[X_flag].T
X = parameter_transformer(X_orig)
dy = parameter_transformer.log_abs_det_jacobian(X)
y = y_orig + dy / T
function_logger.X[X_flag, :] = X
function_logger.y[X_flag] = y.T
function_logger.parameter_transformer = parameter_transformer
# Update search bounds:
# Invert points to original space with old transform,
# then map to new space with new transform
def warpfun(x):
return parameter_transformer(vp.parameter_transformer.inverse(x))
Nrnd = 1000
xx = (
np.random.rand(Nrnd, vp.D)
* (optim_state["ub_search"] - optim_state["lb_search"])
+ optim_state["lb_search"]
)
yy = warpfun(xx)
yyMin = np.min(yy, axis=0)
yyMax = np.max(yy, axis=0)
delta = yyMax - yyMin
optim_state["lb_search"] = np.atleast_2d(yyMin - delta / Nrnd)
optim_state["ub_search"] = np.atleast_2d(yyMax + delta / Nrnd)
# If search cache is not empty, update it
if optim_state.get("search_cache"):
optim_state["search_cache"] = warpfun(optim_state["search_cache"])
# Update other state fields
optim_state["recompute_var_post"] = True
optim_state["skip_active_sampling"] = True
optim_state["warping_count"] += 1
optim_state["last_warping"] = optim_state["iter"]
optim_state["last_successful_warping"] = optim_state["iter"]
# Reset GP Hyperparameters
optim_state["run_mean"] = []
optim_state["run_cov"] = []
optim_state["last_run_avg"] = np.nan
# Warp action for output display
if options.get("warp_nonlinear"):
warp_action = "warp"
else:
warp_action = "rotoscale"
return parameter_transformer, optim_state, function_logger, warp_action
[docs]
def warp_gp_and_vp(parameter_transformer, gp_old, vp_old, vbmc):
r"""Update the GP and VP with a given warp transformation.
Applies an updated ParameterTransformer object (with new warping
transformation) to the GP and VP parameters.
Parameters
----------
parameter_transformer : ParameterTransformer
The new (warped) transformation between input coordinates and inference
coordinates.
gp_old : gpr.GP
The current Gaussian process.
vp_old : VariationalPosterior
The current variational posterior.
vbmc : VBMC
The current VBMC object.
Returns
-------
vp : VariationalPosterior
An updated copy of the original variational posterior.
hyp_warped : dict
An updated copy of the dictionary of original GP hyperparameters, with
the warping transformation applied.
"""
vp_old = copy.deepcopy(vp_old)
# Invert points from the old inference space to the input space,
# then push them back to the new inference space
def warpfun(x):
return parameter_transformer(vp_old.parameter_transformer.inverse(x))
# Temperature scaling
if vbmc.optim_state.get("temperature"):
T = vbmc.optim_state["temperature"]
else:
T = 1
# Get the number of GP hyperparameters, for indexing:
Ncov = gp_old.covariance.hyperparameter_count(vbmc.D)
Nnoise = gp_old.noise.hyperparameter_count()
Nmean = gp_old.mean.hyperparameter_count(vbmc.D)
# MATLAB: if ~isempty(gp_old.outwarpfun); Noutwarp = gp_old.Noutwarp;
# else; Noutwarp = 0; end
# (Not used, see gaussian_process.py)
Ns_gp = len(gp_old.posteriors)
hyp_warped = np.zeros([Ncov + Nnoise + Nmean, Ns_gp])
for s in range(Ns_gp):
hyp = gp_old.posteriors[s].hyp.copy()
hyp_warped[:, s] = hyp.copy()
# UpdateGP input length scales
ell = np.exp(hyp[0 : vbmc.D]).T
(__, ell_new, __) = unscent_warp(warpfun, gp_old.X, ell)
hyp_warped[0 : vbmc.D, s] = np.mean(np.log(ell_new), axis=0)
# We assume relatively no change to GP output and noise scales
if isinstance(gp_old.mean, gpr.mean_functions.ConstantMean):
# Warp constant mean
m0 = hyp[Ncov + Nnoise]
dy_old = vp_old.parameter_transformer.log_abs_det_jacobian(
gp_old.X
)
dy = parameter_transformer.log_abs_det_jacobian(warpfun(gp_old.X))
m0w = m0 + (np.mean(dy, axis=0) - np.mean(dy_old, axis=0)) / T
hyp_warped[Ncov + Nnoise, s] = m0w
elif isinstance(gp_old.mean, gpr.mean_functions.NegativeQuadratic):
# Warp quadratic mean
m0 = hyp[Ncov + Nnoise]
xm = hyp[Ncov + Nnoise + 1 : Ncov + Nnoise + vbmc.D + 1].T
omega = np.exp(
hyp[
Ncov + Nnoise + vbmc.D + 1 : Ncov + Nnoise + 2 * vbmc.D + 1
]
).T
# Warp location and scale
(xmw, omegaw, __) = unscent_warp(warpfun, xm, omega)
# Warp maximum
dy_old = vp_old.parameter_transformer.log_abs_det_jacobian(xm).T
dy = parameter_transformer.log_abs_det_jacobian(xmw).T
m0w = m0 + (dy - dy_old) / T
hyp_warped[Ncov + Nnoise, s] = m0w
hyp_warped[
Ncov + Nnoise + 1 : Ncov + Nnoise + vbmc.D + 1, s
] = xmw.T
hyp_warped[
Ncov + Nnoise + vbmc.D + 1 : Ncov + Nnoise + 2 * vbmc.D + 1, s
] = np.log(omegaw).T
else:
raise ValueError("Unsupported GP mean function for input warping.")
hyp_warped = hyp_warped.T
# Update variational posterior
vp = copy.deepcopy(vp_old)
vp.parameter_transformer = copy.deepcopy(parameter_transformer)
mu = vp.mu.T
sigmalambda = (vp_old.lambd * vp_old.sigma).T
(muw, sigmalambdaw, __) = unscent_warp(warpfun, mu, sigmalambda)
vp.mu = muw.T
lambdaw = np.sqrt(
vbmc.D
* np.mean(
(sigmalambdaw**2).T / np.sum(sigmalambdaw**2, axis=1), axis=1
)
).T
vp.lambd[:, 0] = lambdaw
sigmaw = np.exp(np.mean(np.log(sigmalambdaw / lambdaw), axis=1))
vp.sigma[0, :] = sigmaw
# Approximate change in weight:
dy_old = vp_old.parameter_transformer.log_abs_det_jacobian(mu)
dy = parameter_transformer.log_abs_det_jacobian(muw)
ww = vp_old.w * np.exp((dy - dy_old) / T)
vp.w = ww / np.sum(ww)
return vp, hyp_warped