in botorch/acquisition/multi_objective/multi_output_risk_measures.py [0:0]
def get_mvar_set_cpu(self, Y: Tensor) -> Tensor:
r"""Find MVaR set based on the definition in [Prekopa2012MVaR]_.
NOTE: This is much faster on CPU for large `n_w` than the alternative but it
is significantly slower on GPU. Based on empirical evidence, this is recommended
when running on CPU with `n_w > 64`.
This first calculates the CDF for each point on the extended domain of the
random variable (the grid defined by the given samples), then takes the
values with CDF equal to (rounded if necessary) `alpha`. The non-dominated
subset of these form the MVaR set.
Args:
Y: A `batch x n_w x m`-dim tensor of outcomes. This is currently
restricted to `m = 2` objectives.
TODO: Support `m > 2` objectives.
Returns:
A `batch` length list of `k x m`-dim tensor of MVaR values, where `k`
depends on the corresponding batch inputs. Note that MVaR values in general
are not in-sample points.
"""
if Y.dim() == 3:
return [self.get_mvar_set_cpu(y_) for y_ in Y]
m = Y.shape[-1]
if m != 2: # pragma: no cover
raise ValueError("`get_mvar_set_cpu` only supports `m=2` outcomes!")
# Generate sets of all unique values in each output dimension.
# Note that points in MVaR are bounded from above by the
# independent VaR of each objective. Hence, we only need to
# consider the unique outcomes that are less than or equal to
# the VaR of the independent objectives
var_alpha_idx = ceil(self.alpha * self.n_w) - 1
Y_sorted = Y.topk(Y.shape[0] - var_alpha_idx, dim=0, largest=False).values
unique_outcomes_list = [
Y_sorted[:, i].unique().tolist()[::-1] for i in range(m)
]
# Convert this into a list of m dictionaries mapping values to indices.
unique_outcomes = [
dict(zip(outcomes, range(len(outcomes))))
for outcomes in unique_outcomes_list
]
# Initialize a tensor counting the number of points in Y that a given grid point
# is dominated by. This will essentially be a non-normalized CDF.
counter_tensor = torch.zeros(
[len(outcomes) for outcomes in unique_outcomes],
dtype=torch.long,
device=Y.device,
)
# populate the tensor, counting the dominated points.
# we only need to consider points in Y where at least one
# objective is less than the max objective value in
# unique_outcomes_list
max_vals = torch.tensor(
[o[0] for o in unique_outcomes_list], dtype=Y.dtype, device=Y.device
)
mask = (Y < max_vals).any(dim=-1)
counter_tensor += self.n_w - mask.sum()
Y_pruned = Y[mask]
for y_ in Y_pruned:
starting_idcs = [unique_outcomes[i].get(y_[i].item(), 0) for i in range(m)]
counter_tensor[starting_idcs[0] :, starting_idcs[1] :] += 1
# Get the count alpha-level points should have.
alpha_count = ceil(self.alpha * self.n_w)
# Get the alpha level indices.
alpha_level_indices = (counter_tensor == alpha_count).nonzero(as_tuple=False)
# If there are no exact alpha level points, get the smallest alpha' > alpha
# and find the corresponding alpha level indices.
if alpha_level_indices.numel() == 0:
min_greater_than_alpha = counter_tensor[counter_tensor > alpha_count].min()
alpha_level_indices = (counter_tensor == min_greater_than_alpha).nonzero(
as_tuple=False
)
unique_outcomes = [
torch.as_tensor(list(outcomes.keys()), device=Y.device, dtype=Y.dtype)
for outcomes in unique_outcomes
]
alpha_level_points = torch.stack(
[
unique_outcomes[i][alpha_level_indices[:, i]]
for i in range(len(unique_outcomes))
],
dim=-1,
)
# MVaR is simply the non-dominated subset of alpha level points.
if self.filter_dominated:
mask = is_non_dominated(alpha_level_points)
mvar = alpha_level_points[mask]
else:
mvar = alpha_level_points
return mvar