in lerobot/common/policies/vqbet/modeling_vqbet.py [0:0]
def forward(self, x, **kwargs) -> dict:
# N is the batch size, and T is number of action query tokens, which are process through same GPT
N, T, _ = x.shape
# we calculate N and T side parallelly. Thus, the dimensions would be
# (batch size * number of action query tokens, action chunk size, action dimension)
x = einops.rearrange(x, "N T WA -> (N T) WA")
# sample offsets
cbet_offsets = self.map_to_cbet_preds_offset(x)
cbet_offsets = einops.rearrange(
cbet_offsets,
"(NT) (G C WA) -> (NT) G C WA",
G=self.vqvae_model.vqvae_num_layers,
C=self.config.vqvae_n_embed,
)
# if self.config.sequentially_select is True, bin prediction head first sample the primary code, and then sample secondary code
if self.config.sequentially_select:
cbet_primary_logits = self.map_to_cbet_preds_primary_bin(x)
# select primary bin first
cbet_primary_probs = torch.softmax(
cbet_primary_logits / self.config.bet_softmax_temperature, dim=-1
)
NT, choices = cbet_primary_probs.shape
sampled_primary_centers = einops.rearrange(
torch.multinomial(cbet_primary_probs.view(-1, choices), num_samples=1),
"(NT) 1 -> NT",
NT=NT,
)
cbet_secondary_logits = self.map_to_cbet_preds_secondary_bin(
torch.cat(
(x, F.one_hot(sampled_primary_centers, num_classes=self.config.vqvae_n_embed)),
axis=1,
)
)
cbet_secondary_probs = torch.softmax(
cbet_secondary_logits / self.config.bet_softmax_temperature, dim=-1
)
sampled_secondary_centers = einops.rearrange(
torch.multinomial(cbet_secondary_probs.view(-1, choices), num_samples=1),
"(NT) 1 -> NT",
NT=NT,
)
sampled_centers = torch.stack((sampled_primary_centers, sampled_secondary_centers), axis=1)
cbet_logits = torch.stack([cbet_primary_logits, cbet_secondary_logits], dim=1)
# if self.config.sequentially_select is False, bin prediction head samples primary and secondary code at once.
else:
cbet_logits = self.map_to_cbet_preds_bin(x)
cbet_logits = einops.rearrange(
cbet_logits, "(NT) (G C) -> (NT) G C", G=self.vqvae_model.vqvae_num_layers
)
cbet_probs = torch.softmax(cbet_logits / self.config.bet_softmax_temperature, dim=-1)
NT, G, choices = cbet_probs.shape
sampled_centers = einops.rearrange(
torch.multinomial(cbet_probs.view(-1, choices), num_samples=1),
"(NT G) 1 -> NT G",
NT=NT,
)
device = get_device_from_parameters(self)
indices = (
torch.arange(NT, device=device).unsqueeze(1),
torch.arange(self.vqvae_model.vqvae_num_layers, device=device).unsqueeze(0),
sampled_centers,
)
# Use advanced indexing to sample the values (Extract the only offsets corresponding to the sampled codes.)
sampled_offsets = cbet_offsets[indices]
# Then, sum the offsets over the RVQ layers to get a net offset for the bin prediction
sampled_offsets = sampled_offsets.sum(dim=1)
with torch.no_grad():
# Get the centroids (= vectors corresponding to the codes) of each layer to pass it through RVQ decoder
return_decoder_input = self.vqvae_model.get_embeddings_from_code(sampled_centers).clone().detach()
# pass the centroids through decoder to get actions.
decoded_action = self.vqvae_model.get_action_from_latent(return_decoder_input).clone().detach()
# reshaped extracted offset to match with decoded centroids
sampled_offsets = einops.rearrange(
sampled_offsets, "NT (W A) -> NT W A", W=self.config.action_chunk_size
)
# add offset and decoded centroids
predicted_action = decoded_action + sampled_offsets
predicted_action = einops.rearrange(
predicted_action,
"(N T) W A -> N T (W A)",
N=N,
T=T,
W=self.config.action_chunk_size,
)
return {
"cbet_logits": cbet_logits,
"predicted_action": predicted_action,
"sampled_centers": sampled_centers,
"decoded_action": decoded_action,
}