Source code for pyvbmc.variational_posterior.variational_posterior

# for annotating VP as input of itself in mtv
from __future__ import annotations

import sys
from pathlib import Path
from textwrap import indent
from typing import Optional

import corner
import dill
import matplotlib.pyplot as plt
import numpy as np
from scipy.integrate import trapezoid
from scipy.interpolate import interp1d
from scipy.optimize import minimize
from scipy.special import gammaln

from pyvbmc.decorators import handle_0D_1D_input
from pyvbmc.formatting import format_dict, full_repr, summarize
from pyvbmc.parameter_transformer import ParameterTransformer
from pyvbmc.stats import kde_1d, kl_div_mvn


[docs] class VariationalPosterior: r""" The variational posterior class used in PyVBMC. The variational posterior represents the approximate posterior as returned by the Variational Bayesian Monte Carlo (VBMC) algorithm. In VBMC, the variational posterior is a weighted mixture of multivariate normal distributions (see Notes below for details). Parameters ---------- D : int The number of dimensions (i.e., parameters) of the posterior. K : int, optional The number of mixture components, default 2. x0 : np.ndarray, optional The starting vector for the mixture components means. It can be a single array or multiple rows (up to `K`); missing rows are duplicated by making copies of `x0`, default ``np.zeros``. parameter_transformer : ParameterTransformer, optional The ``ParameterTransformer`` object specifying the transformation of the input space that leads to the current representation used by the variational posterior, by default uses an identity transform. Attributes ---------- w : np.ndarray The weights of the VP mixture components, shape ``(1, K)``. eta : np.ndarray The unbounded (softmax) parametrization of the VP mixture components, shape ``(1, K)``. mu : np.ndarray The means of the VP mixture components, shape ``(D, K)``. sigma : np.ndarray The per-component scale of the VP mixture components. Shape ``(1, K)``. lambd : np.ndarray The per-dimension scale of the VP mixture components. Shape ``(D, 1)``. optimize_weights : bool Whether to optimize the weights. optimize_mu : bool Whether to optimize the means. optimize_sigma : bool Whether to optimize ``sigma``. optimize_lambd : bool Whether to optimize ``lambd``. parameter_transformer : ParameterTransformer The parameter transformer implementing transformations to/from unbounded space. bounds : dict A dictionary containing the soft bounds for each variable to be optimized. stats : dict A dictionary of statistics and other relevant info computed during optimization. Notes ----- In VBMC, the variational posterior is defined as a mixture of multivariate normal distributions as follows: .. math:: q({\theta}) = \sum_{k = 1}^K w_k N(\theta, \mu_k, \sigma^2_k \Lambda) where :math:`w_k` are the mixture weights, :math:`\mu_k` the component means, :math:`\sigma_k` scaling factors, and :math:`\Lambda` is a diagonal matrix common to all components with elements :math:`\lambda^2_d` on the diagonal, for :math:`1 \le d \le D`. Note that :math:`q({\theta})` is defined in an unconstrained space. Constrained variables in the posterior are mapped to a trasformed, unconstrained space via a nonlinear mapping (represented by a ``ParameterTransformer`` object). The transformation is handled automatically. In practice, you would almost never create a new ``VariationalPosterior`` object, simply use the variational posteriors returned by ``VBMC``. """ def __init__( self, D: int, K: int = 2, x0=None, parameter_transformer=None ): self.D = D # number of dimensions self.K = K # number of components if x0 is None: x0 = np.zeros((D, K)) elif x0.size == D: x0.reshape(-1, 1) # reshape to vertical array x0 = np.tile(x0, (K, 1)).T # copy vector else: x0 = x0.T x0 = np.tile(x0, int(np.ceil(self.K / x0.shape[1]))) x0 = x0[:, 0 : self.K] self.w = np.ones((1, K)) / K self.eta = np.ones((1, K)) / K self.mu = x0 + 1e-6 * np.random.randn(self.D, self.K) self.sigma = 1e-3 * np.ones((1, K)) self.lambd = np.ones((self.D, 1)) # By default, optimize all variational parameters self.optimize_weights = True self.optimize_mu = True self.optimize_sigma = True self.optimize_lambd = True if parameter_transformer is None: self.parameter_transformer = ParameterTransformer(self.D) else: self.parameter_transformer = parameter_transformer self.bounds = None self.stats = None self._mode = None
[docs] def get_bounds(self, X: np.ndarray, options, K: int = None): """ Compute soft bounds for variational posterior parameters. These bounds are used during the variational optimization in ``VBMC``. Parameters ---------- X : ndarray, shape (N, D) Training inputs. options : Options Program options. K : int, optional The number of mixture components. By default we use the number provided at class instantiation. Returns ------- theta_bnd : dict A dictionary of soft bounds with the following elements: **lb** : np.ndarray Lower bounds. **ub** : np.ndarray Upper bounds. **tol_con** : float Fractional tolerance for constraint violation of variational parameters. **weight_threshold** : float, optional Threshold below which weights are penalized. **weight_penalty** : float, optional The penalty for weight below the threshold. """ if K is None: K = self.K # Soft-bound loss is computed on MU and SCALE (which is SIGMA times # LAMBDA) # Start with reversed bounds (see below) if self.bounds is None: self.bounds = { "mu_lb": np.full((self.D,), np.inf), "mu_ub": np.full((self.D,), -np.inf), "lnscale_lb": np.full((self.D,), np.inf), "lnscale_ub": np.full((self.D,), -np.inf), } # Set bounds for mean parameters of variational components self.bounds["mu_lb"] = np.minimum( np.min(X, axis=0), self.bounds["mu_lb"] ) self.bounds["mu_ub"] = np.maximum( np.max(X, axis=0), self.bounds["mu_ub"] ) # Set bounds for log scale parameters of variational components. ln_range = np.log(np.max(X, axis=0) - np.min(X, axis=0)) self.bounds["lnscale_lb"] = np.minimum( self.bounds["lnscale_lb"], ln_range + np.log(options["tol_length"]) ) self.bounds["lnscale_ub"] = np.maximum( self.bounds["lnscale_ub"], ln_range ) # Set bounds for log weight parameters of variation components. if self.optimize_weights: # prevent warning to be printed when doing final boost if options["tol_weight"] == 0: self.bounds["eta_lb"] = -np.inf else: self.bounds["eta_lb"] = np.log(0.5 * options["tol_weight"]) self.bounds["eta_ub"] = 0 lb_list = [] ub_list = [] if self.optimize_mu: lb_list.append(np.tile(self.bounds["mu_lb"], (K,))) ub_list.append(np.tile(self.bounds["mu_ub"], (K,))) if self.optimize_sigma or self.optimize_lambd: lb_list.append(np.tile(self.bounds["lnscale_lb"], (K,))) ub_list.append(np.tile(self.bounds["lnscale_ub"], (K,))) if self.optimize_weights: lb_list.append(np.tile(self.bounds["eta_lb"], (K,))) ub_list.append(np.tile(self.bounds["eta_ub"], (K,))) theta_bnd = { "lb": np.concatenate(lb_list), "ub": np.concatenate(ub_list), } theta_bnd["tol_con"] = options["tol_con_loss"] # Weight below a certain threshold are penalized. if self.optimize_weights: theta_bnd["weight_threshold"] = max( 1 / (4 * K), options["tol_weight"] ) theta_bnd["weight_penalty"] = options["weight_penalty"] return theta_bnd
def sample( self, N: int, orig_flag: bool = True, balance_flag: bool = False, df: float = np.inf, ): """ Draw random samples from the variational posterior. Parameters ---------- N : int Number of samples to draw. orig_flag : bool, optional If `orig_flag` is ``True``, the random vectors are returned in the original parameter space. If ``False``, they are returned in the transformed, unconstrained space used internally by VBMC. By default ``True``. balance_flag : bool, optional If `balance_flag` is ``True``, the generating process is balanced such that the random samples come from each mixture component exactly proportionally (or as close as possible) to the variational mixture weights. If ``False``, the generating mixture component for each sample is determined randomly, according to the mixture weights. By default ``False``. df : float, optional Generate the samples from a heavy-tailed version of the variational posterior, in which the multivariate normal components have been replaced by multivariate `t`-distributions with `df` degrees of freedom. The default is ``np.inf``, limit in which the `t`-distribution becomes a multivariate normal. Returns ------- X : np.ndarray `X` is an `N`-by-`D` matrix of random vectors drawn from the variational posterior. I : np.ndarray `I` is an `N`-by-1 array such that the `i`-th element of `I` indicates the index of the variational mixture component from which the `i`-th row of X has been generated. """ # missing to sample from gp gp_sample = False if N < 1: x = np.zeros((0, self.D)) i = np.zeros((0, 1)) return x, i elif gp_sample: pass else: lambd_row = self.lambd.reshape(1, -1) if self.K > 1: if balance_flag: # exact split of samples according to mixture weights repeats = np.floor(self.w * N).astype("int") i = np.repeat(range(self.K), repeats.ravel()) # compute remainder samples (with correct weights) if needed if N > i.shape[0]: w_extra = self.w * N - repeats repeats_extra = np.ceil(np.sum(w_extra)) w_extra += self.w * (repeats_extra - sum(w_extra)) w_extra /= np.sum(w_extra) i_extra = np.random.choice( range(self.K), size=repeats_extra.astype("int"), p=w_extra.ravel(), ) i = np.append(i, i_extra) np.random.shuffle(i) i = i[:N] else: i = np.random.choice( range(self.K), size=N, p=self.w.ravel() ) if not np.isfinite(df) or df == 0: x = ( self.mu.T[i] + lambd_row * np.random.randn(N, self.D) * self.sigma[:, i].T ) else: t = ( df / 2 / np.sqrt(np.random.gamma(df / 2, df / 2, (N, 1))) ) x = ( self.mu.T[i] + lambd_row * np.random.randn(N, self.D) * t * self.sigma[:, i].T ) else: if not np.isfinite(df) or df == 0: x = ( self.mu.T + lambd_row * np.random.randn(N, self.D) * self.sigma ) else: t = ( df / 2 / np.sqrt(np.random.gamma(df / 2, df / 2, (N, 1))) ) x = ( self.mu.T + lambd_row * t * np.random.randn(N, self.D) * self.sigma ) i = np.zeros(N) if orig_flag: x = self.parameter_transformer.inverse(x) return x, i @handle_0D_1D_input(patched_kwargs=["x"], patched_argpos=[0]) def pdf( self, x: np.ndarray, orig_flag: bool = True, log_flag: bool = False, grad_flag: bool = False, df: float = np.inf, ): """ Probability density function of the variational posterior. Compute the probability density function (pdf) of the variational posterior at one or multiple input points. Parameters ---------- x : np.ndarray `x` is a matrix of inputs to evaluate the pdf at. The rows of the `N`-by-`D` matrix `x` correspond to observations or points, and columns correspond to variables or coordinates. `x` is assumed to be in the original space by default. orig_flag : bool, optional Controls if the value of the posterior density should be evaluated in the original parameter space for `orig_flag` is ``True``, or in the transformed space if `orig_flag` is ``False``, by default ``True``. Accordingly, `x` should be in the original space if `orig_flag` is ``True`` and be in the transformed space if `orig_flag` is `False`. log_flag : bool, optional If `log_flag` is ``True`` return the logarithm of the pdf, by default ``False``. grad_flag : bool, optional If ``True`` the gradient of the pdf is returned as a second output, by default ``False``. df : float, optional Compute the pdf of a heavy-tailed version of the variational posterior, in which the multivariate normal components have been replaced by multivariate `t`-distributions with `df` degrees of freedom. The default is `df` = ``np.inf``, limit in which the `t`-distribution becomes a multivariate normal. Returns ------- pdf: np.ndarray The probability density of the variational posterior evaluated at each row of `x`. gradient: np.ndarray If `grad_flag` is ``True``, the function returns the gradient as well. Raises ------ NotImplementedError Raised if `df` is non-zero and finite and `grad_flag` = ``True`` (Gradient of heavy-tailed pdf not supported yet). NotImplementedError Raised if `orig_flag` = ``True`` and `log_flag` = ``True`` and `grad_flag` = ``True`` (gradient computation of the log-pdf in the original space is not supported yet). """ x = x.copy() N, D = x.shape # compute pdf only for points inside bounds in origspace if orig_flag: mask = np.logical_and( np.all(x > self.parameter_transformer.lb_orig, axis=1), np.all(x < self.parameter_transformer.ub_orig, axis=1), ) else: mask = np.full(N, True) # Convert points to transformed space if orig_flag: x[mask] = self.parameter_transformer(x[mask]) lamd_row = self.lambd.reshape(1, -1) y = np.zeros((N, 1)) if grad_flag: dy = np.zeros((N, D)) if not np.isfinite(df) or df == 0: # compute pdf of variational posterior # common normalization factor nf = 1 / (2 * np.pi) ** (D / 2) / np.prod(lamd_row) for k in range(self.K): d2 = np.sum( ((x - self.mu.T[k]) / (self.sigma[:, k].dot(lamd_row))) ** 2, axis=1, ) nn = ( nf * self.w[:, k] / self.sigma[:, k] ** D * np.exp(-0.5 * d2)[:, np.newaxis] ) y += nn if grad_flag: dy -= ( nn * (x - self.mu.T[k]) / ((lamd_row**2) * self.sigma[:, k] ** 2) ) else: # Compute pdf of heavy-tailed variant of variational posterior if df > 0: # (This uses a multivariate t-distribution which is not the same # thing as the product of D univariate t-distributions) # common normalization factor nf = ( np.exp(gammaln((df + D) / 2) - gammaln(df / 2)) / (df * np.pi) ** (D / 2) / np.prod(self.lambd) ) for k in range(self.K): d2 = np.sum( ((x - self.mu.T[k]) / (self.sigma[:, k].dot(lamd_row))) ** 2, axis=1, ) nn = ( nf * self.w[:, k] / self.sigma[:, k] ** D * (1 + d2 / df) ** (-(df + D) / 2) )[:, np.newaxis] y += nn if grad_flag: raise NotImplementedError( "Gradient of heavy-tailed pdf not supported yet." ) else: # (This uses a product of D univariate t-distributions) df_abs = abs(df) # Common normalization factor nf = ( np.exp(gammaln((df_abs + 1) / 2) - gammaln(df_abs / 2)) / np.sqrt(df_abs * np.pi) ) ** D / np.prod(self.lambd) for k in range(self.K): d2 = ( (x - self.mu.T[k]) / (self.sigma[:, k].dot(lamd_row)) ) ** 2 nn = ( nf * self.w[:, k] / self.sigma[:, k] ** D * np.prod( (1 + d2 / df_abs) ** (-(df_abs + 1) / 2), axis=1 )[:, np.newaxis] ) y += nn if grad_flag: raise NotImplementedError( "Gradient of heavy-tailed pdf not supported yet." ) if log_flag: if grad_flag: dy = dy / y # Avoid log(0): zero_mask = y == 0 y[zero_mask] = -np.inf y[~zero_mask] = np.log(y[~zero_mask]) # PDF is 0 outside original bounds: y[~mask] = -np.inf else: y[~mask] = 0 # apply jacobian correction if orig_flag: if log_flag: y[mask] -= self.parameter_transformer.log_abs_det_jacobian( x[mask] )[:, np.newaxis] if grad_flag: raise NotImplementedError( """vbmc_pdf:NoOriginalGrad: Gradient computation in original space not supported yet.""" ) else: y[mask] /= np.exp( self.parameter_transformer.log_abs_det_jacobian(x[mask])[ :, np.newaxis ] ) if grad_flag: return y, dy else: return y @handle_0D_1D_input(patched_kwargs=["x"], patched_argpos=[0]) def log_pdf( self, *args, **kwargs, ): """ log-probability density function of the variational posterior. Compute the log density of the variational posterior at one or multiple input points. The parameters are the same as for ``vp.pdf()`` with ``log_flag=True``. These parameters are described again here for reference. Parameters ---------- x : np.ndarray `x` is a matrix of inputs to evaluate the pdf at. The rows of the `N`-by-`D` matrix `x` correspond to observations or points, and columns correspond to variables or coordinates. `x` is assumed to be in the original space by default. orig_flag : bool, optional Controls if the value of the posterior density should be evaluated in the original parameter space for `orig_flag` is ``True``, or in the transformed space if `orig_flag` is ``False``, by default ``True``. Accordingly, `x` should be in the original space if `orig_flag` is ``True`` and be in the transformed space if `orig_flag` is `False`. grad_flag : bool, optional If ``True`` the gradient of the log-pdf is returned as a second output, by default ``False``. df : float, optional Compute the log-pdf of a heavy-tailed version of the variational posterior, in which the multivariate normal components have been replaced by multivariate `t`-distributions with `df` degrees of freedom. The default is `df` = ``np.inf``, limit in which the `t`-distribution becomes a multivariate normal. Returns ------- log_pdf: np.ndarray The probability density of the variational posterior evaluated at each row of `x`. gradient: np.ndarray If `grad_flag` is ``True``, the function returns the gradient as well. Raises ------ NotImplementedError Raised if `df` is non-zero and finite and `grad_flag` = ``True`` (Gradient of heavy-tailed pdf not supported yet). NotImplementedError Raised if `orig_flag` = ``True`` and `grad_flag` = ``True`` (gradient computation in original space not supported yet). """ return self.pdf(*args, **kwargs, log_flag=True)
[docs] def get_parameters(self, raw_flag=True): """ Get variational posterior parameters as single array. Return all the active ``VariationalPosterior`` parameters flattened as a 1D (numpy) array, possibly transformed. Parameters ---------- raw_flag : bool, optional Specifies whether the sigma and lambda parameters are returned as raw (unconstrained) or not, by default ``True``. Returns ------- theta : np.ndarray The variational posterior parameters flattenend as a 1D array. """ nl = np.sqrt(np.sum(self.lambd**2) / self.D) self.lambd = self.lambd.reshape(-1, 1) / nl self.sigma = self.sigma.reshape(1, -1) * nl # Ensure that weights are normalized if self.optimize_weights: self.w = self.w.reshape(1, -1) / np.sum(self.w) # remove mode (at least this is done in Matlab) if self.optimize_mu: theta = self.mu.ravel(order="F") else: theta = np.array([]) constrained_parameters = np.array([]) if self.optimize_sigma: constrained_parameters = np.concatenate( (constrained_parameters, self.sigma.ravel()) ) if self.optimize_lambd: constrained_parameters = np.concatenate( (constrained_parameters, self.lambd.ravel()) ) if self.optimize_weights: constrained_parameters = np.concatenate( (constrained_parameters, self.w.ravel()) ) if raw_flag: return np.concatenate((theta, np.log(constrained_parameters))) else: return np.concatenate((theta, constrained_parameters))
[docs] def set_parameters(self, theta: np.ndarray, raw_flag=True): """ Set variational posterior parameters from a single array. Takes as input a ``numpy`` array and assigns it to the variational posterior parameters. Parameters ---------- theta : np.ndarray The array with the parameters that should be assigned. raw_flag : bool, optional Specifies whether the sigma and lambda parameters are passed as raw (unconstrained) or not, by default ``True``. Raises ------ ValueError Raised if sigma, lambda and weights are not positive and raw_flag = ``False``. """ # Make sure we don't get issues with references. theta = theta.copy() # check if sigma, lambda and weights are positive when raw_flag = False if not raw_flag: check_idx = 0 if self.optimize_weights: check_idx -= self.K if self.optimize_lambd: check_idx -= self.D if self.optimize_sigma: check_idx -= self.K if np.any(theta[-check_idx:] < 0.0): raise ValueError( """sigma, lambda and weights must be positive when raw_flag = False""" ) if self.optimize_mu: self.mu = np.reshape( theta[: self.D * self.K], (self.D, self.K), order="F" ) start_idx = self.D * self.K else: start_idx = 0 if self.optimize_sigma: if raw_flag: self.sigma = np.exp(theta[start_idx : start_idx + self.K]) else: self.sigma = theta[start_idx : start_idx + self.K] start_idx += self.K if self.optimize_lambd: if raw_flag: self.lambd = np.exp(theta[start_idx : start_idx + self.D]).T else: self.lambd = theta[start_idx : start_idx + self.D].T if self.optimize_weights: eta = theta[-self.K :] if raw_flag: eta = eta - np.amax(eta) self.w = np.exp(eta.T)[:, np.newaxis] else: self.w = eta.T[:, np.newaxis] nl = np.sqrt(np.sum(self.lambd**2) / self.D) self.lambd = self.lambd.reshape(-1, 1) / nl self.sigma = self.sigma.reshape(1, -1) * nl # Ensure that weights are normalized if self.optimize_weights: self.w = self.w.reshape(1, -1) / np.sum(self.w) # remove mode self._mode = None
def moments(self, N: int = int(1e6), orig_flag=True, cov_flag=False): """ Compute mean and covariance matrix of variational posterior. In the original space, the moments are estimated via Monte Carlo sampling. If requested in the transformed (unconstrained) space used internally by VBMC, the moments can be computed analytically. Parameters ---------- N : int, optional Number of samples used to estimate the moments, by default ``int(1e6)``. orig_flag : bool, optional If ``True``, compute moments in the original parameter space, otherwise in the transformed VBMC space. By default ``True``. cov_flag : bool, optional If ``True``, return the covariance matrix as a second return value, by default ``False``. Returns ------- mean: np.ndarray The mean of the variational posterior. cov: np.ndarray If `cov_flag` is ``True``, returns the covariance matrix as well. """ if orig_flag: x, _ = self.sample(int(N), orig_flag=True, balance_flag=True) mubar = np.mean(x, axis=0) if cov_flag: cov = np.cov(x.T) else: mubar = np.sum(self.w * self.mu, axis=1) if cov_flag: cov = ( np.sum(self.w * self.sigma**2) * np.eye(len(self.lambd)) * self.lambd**2 ) for k in range(self.K): cov += self.w[:, k] * ( (self.mu[:, k] - mubar)[:, np.newaxis] ).dot((self.mu[:, k] - mubar)[:, np.newaxis].T) if cov_flag: return mubar.reshape(1, -1), cov else: return mubar.reshape(1, -1)
[docs] def mode( self, orig_flag=True, n_opts: Optional[int] = None, ): r""" Find the mode of the variational posterior. Parameters ---------- orig_flag : bool, optional If ``True`` find the mode of the variational posterior in the original parameter space, otherwise in the transformed parameter space. By default ``True``. n_opts : int, optional Maximum number of optimization runs from different starting points to find the mode. By default `n_opts` is the square root of the number of mixture components K, that is :math:`n\_opts = \lceil \sqrt{K} \rceil`. Returns ------- mode: np.ndarray The mode of the variational posterior. Notes ----- Mode estimation (e.g., for the purpose of maximum-a-posteriori estimation) is not recommended with VBMC, since due to the underlying representation (mixture of Gaussians) the mode of the variational posterior is a brittle and potentially unreliable estimator of the mode of the target posterior, especially if it lies close to the boundaries of the space. The mode is not invariant to nonlinear reparameterizations of the input space, so the mode in the original space and the mode in the transformed (unconstrained) space will generally be in different locations (even after applying the appropriate transformations). """ if orig_flag and self._mode is not None: return self._mode def neg_log_pdf(x0, orig_flag=orig_flag): if orig_flag: y = self.pdf( x0, orig_flag=True, log_flag=True, grad_flag=False ) return -y else: y, dy = self.pdf( x0, orig_flag=False, log_flag=True, grad_flag=True ) return -y, -dy if n_opts is None: n_opts = int(np.ceil(np.sqrt(self.K))) n_samples = int(1e5) # Samples for choosing starting points x_min = np.zeros((n_opts, self.D)) ff = np.full((n_opts, 1), np.inf) for k in range(n_opts): # Random initial set of points to choose starting point x0_mat, _ = self.sample(n_samples, orig_flag) # Add centers of components to initial set for first optimization if k == 0: x0_mu = self.mu.T if orig_flag: x0_mu = self.parameter_transformer.inverse(x0_mu) x0_mat = np.concatenate([x0_mat, x0_mu]) # Evaluate pdf at all points and start optimization from best y0_vec = neg_log_pdf(x0_mat) if not orig_flag: # drop gradient -dy y0_vec = y0_vec[0] idx = np.argmin(y0_vec.squeeze()) x0 = x0_mat[idx] bounds = None if orig_flag: bounds = np.stack( ( self.parameter_transformer.lb_orig.squeeze() + np.sqrt(np.finfo(float).eps), self.parameter_transformer.ub_orig.squeeze() - np.sqrt(np.finfo(float).eps), ), axis=1, ) x0 = np.minimum( self.parameter_transformer.ub_orig, np.maximum(x0, self.parameter_transformer.lb_orig), ) x0 = x0.squeeze() # fun provides gradient (jac=True) when orig_flag is False: res = minimize( fun=neg_log_pdf, x0=x0, bounds=bounds, jac=not orig_flag ) x_min[k] = res.x ff[k] = res.fun # Get mode and store it idx_min = np.argmin(ff.squeeze()) x = x_min[idx_min] if orig_flag: self._mode = x return x
def mtv( self, vp2: VariationalPosterior = None, samples: np.ndarray = None, N: int = int(1e5), ): r""" Marginal total variation distances between two variational posteriors. Compute the total variation distance between the variational posterior and a second posterior, separately for each dimension (hence "marginal" total variation distance, MTV). The second posterior can be specified either as a ``VariationalPosterior`` or as a set of samples. Parameters ---------- vp2 : VariationalPosterior, optional The other ``VariationalPosterior``, by default ``None``. samples : np.ndarray, optional An `N`-by-`D` matrix of samples from the other variational posterior, by default ``None``. N : int, optional The number of random draws to estimate the MTV, by default ``int(1e5)``. Returns ------- mtv: np.ndarray A `D`-element vector whose elements are the total variation distance between the marginal distributions of `vp` and `vp1` or `samples`, for each coordinate dimension. Raises ------ ValueError Raised if neither `vp2` nor `samples` are specified. Notes ----- The total variation distance between two densities :math:`p_1` and :math:`p_2` is: .. math:: TV(p_1, p_2) = \frac{1}{2} \int | p_1(x) - p_2(x) | dx. """ if vp2 is None and samples is None: raise ValueError("Either vp2 or samples have to be not None") xx1, _ = self.sample(N, True, True) if vp2 is not None: xx2, _ = vp2.sample(N, True, True) lb2 = vp2.parameter_transformer.lb_orig ub2 = vp2.parameter_transformer.ub_orig else: xx2 = samples lb2 = np.full((1, xx2.shape[1]), -np.inf) ub2 = np.full((1, xx2.shape[1]), np.inf) nkde = 2**13 mtv = np.zeros((1, self.D)) # Set bounds for kernel density estimate lb1_xx = np.amin(xx1, axis=0) ub1_xx = np.amax(xx1, axis=0) range1 = ub1_xx - lb1_xx lb1 = np.maximum( lb1_xx - range1 / 10, self.parameter_transformer.lb_orig ) ub1 = np.minimum( ub1_xx + range1 / 10, self.parameter_transformer.ub_orig ) lb2_xx = np.amin(xx2, axis=0) ub2_xx = np.amax(xx2, axis=0) range2 = ub2_xx - lb2_xx lb2 = np.maximum(lb2_xx - range2 / 10, lb2) ub2 = np.minimum(ub2_xx + range2 / 10, ub2) # Compute marginal total variation for d in range(self.D): yy1, x1mesh, _ = kde_1d(xx1[:, d], nkde, lb1[:, d], ub1[:, d]) # Ensure normalization yy1 = yy1 / (trapezoid(yy1) * (x1mesh[1] - x1mesh[0])) yy2, x2mesh, _ = kde_1d(xx2[:, d], nkde, lb2[:, d], ub2[:, d]) # Ensure normalization yy2 = yy2 / (trapezoid(yy2) * (x2mesh[1] - x2mesh[0])) f = lambda x: np.abs( interp1d( x1mesh, yy1, kind="cubic", fill_value=np.array([0]), bounds_error=False, )(x) - interp1d( x2mesh, yy2, kind="cubic", fill_value=np.array([0]), bounds_error=False, )(x) ) bb = np.sort( np.array([x1mesh[0], x1mesh[-1], x2mesh[0], x2mesh[-1]]) ) for j in range(3): xx_range = np.linspace(bb[j], bb[j + 1], num=int(1e5)) mtv[:, d] = mtv[:, d] + 0.5 * trapezoid(f(xx_range)) * ( xx_range[1] - xx_range[0] ) return mtv def kl_div( self, vp2: VariationalPosterior = None, samples: np.ndarray = None, N: int = int(1e5), gauss_flag: bool = False, ): """ Kullback-Leibler divergence between two variational posteriors. Compute the forward and reverse Kullback-Leibler (KL) divergence between two posteriors. The other variational posterior can be specified as `vp2` (an instance of the class ``VariationalPosterior``) or with `samples`. One of the two must be specified. Parameters ---------- vp2 : VariationalPosterior, optional The other ``VariationalPosterior``, by default None. samples : np.ndarray, optional An `N`-by-`D` matrix of samples from the other variational posterior, by default ``None``. N : int, optional The number of random samples to estimate the KL divergence, by default ``int(1e5)``. gauss_flag : bool, optional If ``True``, returns a "Gaussianized" KL-divergence, that is the KL divergence between two multivariate normal distributions with the same moments as the variational posteriors given as inputs. By default ``False``. Returns ------- kl_div: np.ndarray A two-element vector containing the forward and reverse Kullback-Leibler divergence between the two posteriors. Raises ------ ValueError Raised if neither `vp2` nor `samples` are specified. ValueError Raised if `vp2` is not provided but `gauss_flag` = ``False``. Notes ----- Since the KL divergence is not symmetric, the method returns both the forward and the reverse KL divergence, that is KL(`vp1` || `vp2`) and KL(`vp2` || `vp1`). """ if samples is None and vp2 is None: raise ValueError("Either vp2 or samples have to be not None") if not gauss_flag and vp2 is None: raise ValueError( "Unless the KL divergence is gaussianized, VP2 is required." ) if gauss_flag: if N == 0: raise ValueError( """Analytical moments are available only for the transformed space.""" ) else: q1mu, q1sigma = self.moments(N, True, True) if vp2 is not None: q2mu, q2sigma = vp2.moments(N, True, True) else: q2mu = np.mean(samples) q2sigma = np.cov(samples.T) kls = kl_div_mvn(q1mu, q1sigma, q2mu, q2sigma) else: minp = sys.float_info.min xx1, _ = self.sample(N, True, True) q1 = self.pdf(xx1, True) q2 = vp2.pdf(xx1, True) q1[q1 == 0 | np.isinf(q1)] = 1.0 q2[q2 == 0 | np.isinf(q2)] = minp kl1 = -np.mean(np.log(q2) - np.log(q1)) xx2, _ = vp2.sample(N, True, True) q1 = self.pdf(xx2, True) q2 = vp2.pdf(xx2, True) q1[q1 == 0 | np.isinf(q1)] = minp q2[q2 == 0 | np.isinf(q2)] = 1.0 kl2 = -np.mean(np.log(q1) - np.log(q2)) kls = np.concatenate((kl1, kl2), axis=None) # Correct for numerical errors kls = np.maximum(0, kls) return kls def plot( self, n_samples: int = int(1e5), title: str = None, plot_data: bool = False, highlight_data: list = None, plot_vp_centres: bool = False, plot_style: dict = None, gp: GaussianProcess = None, ): """ Plot the variational posterior. `plot` displays the variational posterior as a cornerplot showing the 1D and 2D marginals, estimated from samples. It uses the `corner <https://corner.readthedocs.io/en/latest/index.html>`_ package. `plot` also optionally displays the centres of the variational mixture components and the datapoints of the underlying Gaussian process (GP) used by ``VBMC``. The plot can be enhanced by custom styles and specific datapoints of the GP can be highlighted. Parameters ---------- n_samples : int, optional The number of posterior samples used to create the plot, by default ``int(1e5)``. title : str, optional The title of the plot, by default ``None``. plot_data : bool, optional Whether to plot the datapoints of the GP, by default ``False``. highlight_data : list, optional Indices of the GP datapoints that should be plotted in a different way than the other datapoints, by default ``None``. plot_vp_centres : bool, optional Whether to plot the centres of the `vp` components, by default ``False``. plot_style : dict, optional A dictionary of plot styling options. The possible options are: **corner** : dict, optional Styling options directly passed to the corner function. By default: ``{"fig": plt.figure(figsize=(8, 8)), "labels": labels}``. See the documentation of `corner <https://corner.readthedocs.io/en/latest/index.html>`_. **data** : dict, optional Styling options used to plot the GP data. By default: ``{"s":15, "color":'blue', "facecolors": "none"}``. **highlight_data** : dict, optional Styling options used to plot the highlighted GP data. By default: ``{"s":15, "color":"orange"}``. **vp_centre** : dict, optional Styling options used to plot the `vp` centres. By default: ``{"marker":"x", "color":"red"}``. gp : gpyreg.GaussianProcess, optional The GP to use for plotting actively sampled points. Returns ------- fig : matplotlib.figure.Figure The resulting ``matplotlib`` figure of the plot. """ # generate samples Xs, _ = self.sample(n_samples) # cornerplot with samples of vp fig = plt.figure(figsize=(6, 6)) labels = ["$x_{}$".format(i) for i in range(self.D)] corner_style = dict({"fig": fig, "labels": labels}) if plot_style is None: plot_style = {} if "corner" in plot_style: corner_style.update(plot_style.get("corner")) # suppress warnings for small datasets with quiet=True fig = corner.corner(Xs, quiet=True, **corner_style) # style of the gp data data_style = dict({"s": 15, "color": "blue", "facecolors": "none"}) if "data" in plot_style: data_style.update(plot_style.get("data")) highlighted_data_style = dict( { "s": 15, "color": "orange", } ) if "highlight_data" in plot_style: highlighted_data_style.update(plot_style.get("highlight_data")) axes = np.array(fig.axes).reshape((self.D, self.D)) # plot gp data if plot_data and gp is not None: # highlight nothing when argument is None if highlight_data is None or highlight_data.size == 0: highlight_data = np.array([False] * len(gp.X)) normal_data = ~highlight_data else: normal_data = [ i for i in range(len(gp.X)) if i not in highlight_data ] orig_X_norm = self.parameter_transformer.inverse(gp.X[normal_data]) orig_X_highlight = self.parameter_transformer.inverse( gp.X[highlight_data] ) for r in range(1, self.D): for c in range(self.D - 1): if r > c: axes[r, c].scatter( orig_X_norm[:, c], orig_X_norm[:, r], **data_style ) axes[r, c].scatter( orig_X_highlight[:, c], orig_X_highlight[:, r], **highlighted_data_style, ) # Rescale to capture new GP training points: # axes[r, c].autoscale() # style of the vp centres vp_centre_style = dict( { "marker": "x", "color": "red", } ) if "vp_centre" in plot_style: vp_centre_style.update(plot_style["vp_centre"]) # plot centres of vp components if plot_vp_centres: for r in range(1, self.D): for c in range(self.D - 1): if r > c: for component in self.parameter_transformer.inverse( self.mu.T ): axes[r, c].plot( component[c], component[r], **vp_centre_style ) if title is not None: fig.suptitle(title) # adjust spacing between subplots fig.tight_layout(pad=0.5) return fig def save(self, file, overwrite=False): """Save the VP to a file. Parameters ---------- file : path-like The file name or path to write to. Default file extension `.pkl` will be added if no extension is specified. overwrite : bool Whether to allow overwriting existing files. Default `False`. Raises ------ FileExistsError If the file already exists and ``overwrite`` is `False`. OSError If the file cannot be opened for other reasons (e.g., the directory is not found, the disk is full, etc.). """ filepath = Path(file) if filepath.suffix == "": filepath = filepath.with_suffix(".pkl") if overwrite: mode = "wb" else: mode = "xb" with open(filepath, mode=mode) as f: dill.dump(self, f, recurse=True) @classmethod def load(cls, file): """Load a VP from a file. Parameters ---------- file : path-like The file name or path to write to. Returns ------- vp : VariationalPosterior The loaded VP instance. Raises ------ OSError If the file cannot be found, or cannot be opened for other reasons. """ filepath = Path(file) if filepath.suffix == "": filepath = filepath.with_suffix(".pkl") with open(filepath, mode="rb") as f: vp = dill.load(f) return vp def __str__(self, arr_size_thresh=10): """Print a string summary.""" return "VariationalPosterior:" + indent( f""" dimension = {self.D}, num. components = {self.K}, means: {summarize(self.mu, arr_size_thresh)}, weights: {summarize(self.w, arr_size_thresh)}, sigma (per-component scale): {summarize(self.sigma, arr_size_thresh)}, lambda (per-dimension scale): {summarize(self.lambd, arr_size_thresh)}, stats = {format_dict(self.stats, arr_size_thresh=arr_size_thresh)}""", " ", ) def __repr__(self, arr_size_thresh=10, expand=False): """Construct a detailed string summary. Parameters ---------- arr_size_thresh : float, optional If ``obj`` is an array whose product of dimensions is less than ``arr_size_thresh``, print the full array. Otherwise print only the shape. Default `10`. expand : bool, optional If ``expand`` is `False`, then describe the object's complex child attributes by their name and memory location. Otherwise, recursively expand the child attributes into their own representations. Default `False`. Returns ------- string : str The string representation of ``self``. """ return full_repr( self, "VariationalPosterior", order=["D", "K", "mu", "w", "sigma", "lambd", "stats"], expand=expand, arr_size_thresh=arr_size_thresh, )