in syne_tune/optimizer/schedulers/searchers/bayesopt/gpautograd/learncurve/issm.py [0:0]
def sample_posterior_joint(
poster_state: Dict, mean, kernel, feature, targets: np.ndarray,
issm_params: Dict, r_min: int, r_max: int, random_state: RandomState,
num_samples: int = 1) -> Dict:
"""
Given `poster_state` for some data plus one additional configuration
with data (`feature`, `targets`, `issm_params`), draw joint samples
of the latent variables not fixed by the data, and of the latent
target values. `targets` may be empty, but must not reach all the
way to `r_max`. The additional configuration must not be in the
dataset used to compute `poster_state`.
If `targets` correspond to resource values range(r_min, r_obs), we
sample latent target values y_r corresponding to range(r_obs, r_max+1)
and latent function values f_r corresponding to range(r_obs-1, r_max+1),
unless r_obs = r_min (i.e. `targets` empty), in which case both [y_r]
and [f_r] ranges in range(r_min, r_max+1). We return a dict with
[f_r] under `f`, [y_r] under `y`. These are matrices with `num_samples`
columns.
:param poster_state: Posterior state for data
:param mean: Mean function
:param kernel: Kernel function
:param feature: Features for additional config
:param targets: Target values for additional config
:param issm_params: Likelihood parameters for additional config
:param r_min: Smallest resource value
:param r_max: Largest resource value
:param random_state: numpy.random.RandomState
:param num_samples: Number of joint samples to draw (default: 1)
:return: See above
"""
num_res = r_max + 1 - r_min
targets = _colvec(targets, _np=np)
ydim = targets.size
t_obs = num_res - ydim
assert t_obs > 0, f"targets.size = {ydim} must be < {num_res}"
assert getval(poster_state['pmat'].shape[1]) == 1, \
"sample_posterior_joint cannot be used for posterior state " +\
"based on fantasizing"
# ISSM parameters
alpha = issm_params['alpha'][0]
alpha_m1 = alpha - 1.0
beta = issm_params['beta'][0]
gamma = issm_params['gamma']
# Posterior mean and variance of h for additional config
post_mean, post_variance = predict_posterior_marginals(
poster_state, mean, kernel, _rowvec(feature, _np=np))
post_mean = post_mean[0].item()
post_variance = post_variance[0].item()
# Compute [a_t], [gamma^2 a_t^2]
lrvec = np.array(
[np.log(r_max - t) for t in range(num_res - 1)]) * alpha_m1 + beta
avec = alpha * np.exp(lrvec)
a2vec = np.square(alpha * gamma) * np.exp(lrvec * 2.0)
# Draw the [eps_t] for all samples
epsmat = random_state.normal(size=(num_res, num_samples))
# Compute samples [f_t], [y_t], not conditioned on targets
hvec = random_state.normal(
size=(1, num_samples)) * np.sqrt(post_variance) + post_mean
f_rows = []
y_rows = []
fcurr = hvec
for t in range(num_res - 1):
eps_row = _rowvec(epsmat[t], _np=np)
f_rows.append(fcurr)
y_rows.append(fcurr + eps_row)
fcurr = fcurr - avec[t] * (eps_row * gamma + 1.0)
eps_row = _rowvec(epsmat[-1], _np=np)
f_rows.append(fcurr)
y_rows.append(fcurr + eps_row)
if ydim > 0:
# Condition on targets
# Prior samples (reverse order t -> r)
fsamples = np.concatenate(
tuple(reversed(f_rows[:(t_obs + 1)])), axis=0)
# Compute c1 and d1 vectors (same for all samples)
zeroscal = np.zeros((1,))
c1vec = np.flip(np.concatenate(
(zeroscal, np.cumsum(avec[:t_obs])), axis=None))
d1vec = np.flip(np.concatenate(
(zeroscal, np.cumsum(a2vec[:t_obs])), axis=None))
# Assemble targets for conditional means
ymat = np.concatenate(tuple(reversed(y_rows[t_obs:])), axis=0)
ycols = np.split(ymat, num_samples, axis=1)
assert ycols[0].size == ydim # Sanity check
# v^T v, w^T v for sampled targets
onevec = np.ones((num_samples,))
_issm_params = {
'alpha': alpha * onevec,
'beta': beta * onevec,
'gamma': gamma}
issm_likelihood = issm_likelihood_slow_computations(
targets=[_colvec(v, _np=np) for v in ycols],
issm_params=_issm_params,
r_min=r_min, r_max=r_max, skip_c_d=True)
vtv = issm_likelihood['vtv']
wtv = issm_likelihood['wtv']
# v^T v, w^T v for observed (last entry)
issm_likelihood = issm_likelihood_slow_computations(
targets=[targets],
issm_params=issm_params,
r_min=r_min, r_max=r_max)
vtv = _rowvec(
np.concatenate((vtv, issm_likelihood['vtv']), axis=None), _np=np)
wtv = _rowvec(
np.concatenate((wtv, issm_likelihood['wtv']), axis=None), _np=np)
cscal = issm_likelihood['c'][0]
dscal = issm_likelihood['d'][0]
c1vec = _colvec(c1vec, _np=np)
d1vec = _colvec(d1vec, _np=np)
c2vec = cscal - c1vec
d2vec = dscal - d1vec
# Compute num_samples + 1 conditional mean vectors in one go
denom = vtv * (post_variance + dscal) + 1.0
cond_means = ((post_mean - c1vec) * (d2vec * vtv + 1.0) +
(d1vec + post_variance) * (c2vec * vtv + wtv)) / denom
fsamples = fsamples - cond_means[:, :num_samples] + _colvec(
cond_means[:, num_samples], _np=np)
# Samples [y_r] from [f_r]
frmat = fsamples[1:]
frm1mat = fsamples[:-1]
arvec = _colvec(np.minimum(
np.flip(avec[:t_obs]), -MIN_POSTERIOR_VARIANCE), _np=np)
ysamples = ((frmat - frm1mat) / arvec - 1.0) * (1.0 / gamma) + frmat
else:
# Nothing to condition on
fsamples = np.concatenate(tuple(reversed(f_rows)), axis=0)
ysamples = np.concatenate(tuple(reversed(y_rows)), axis=0)
return {
'f': fsamples,
'y': ysamples}