arctic_inference/vllm/spec_dec/logits_processor_opt.py (67 lines of code) (raw):
# SPDX-License-Identifier: Apache-2.0
"""A layer that compute logits from hidden_stats."""
from typing import Optional
import torch
import torch.nn as nn
import vllm.envs as envs
from vllm.distributed import (tensor_model_parallel_all_gather,
tensor_model_parallel_gather)
from arctic_inference.vllm.spec_dec.vocab_parallel_embedding import (
VocabParallelEmbedding)
from vllm.platforms import current_platform
class LogitsProcessorOpt(nn.Module):
"""Process logits and apply logits processors from sampling metadata.
This layer does the following:
1. Gather logits from model hidden_states.
2. Scale logits if needed.
3. Apply logits processors (if any).
"""
def __init__(self,
vocab_size: int,
org_vocab_size: Optional[int] = None,
scale: float = 1.0,
logits_as_input: bool = False,
soft_cap: Optional[float] = None,
skip_last_gather: bool = False) -> None:
"""
Args:
scale: A scaling factor to apply to the logits.
"""
super().__init__()
self.scale = scale
self.vocab_size = vocab_size
# Whether the input is logits (default is hidden states).
self.logits_as_input = logits_as_input
# original vocabulary size (without LoRA).
self.org_vocab_size = org_vocab_size or vocab_size
# Soft cap the logits. Used in Gemma 2.
self.soft_cap = soft_cap
# Whether to use gather or all-gather to gather the logits.
self.use_gather = not current_platform.is_tpu(
) and not envs.VLLM_USE_V1
self.skip_last_gather = skip_last_gather
def _get_logits_and_post_processing(
self,
lm_head: VocabParallelEmbedding,
hidden_states: torch.Tensor,
embedding_bias: Optional[torch.Tensor] = None,
) -> Optional[torch.Tensor]:
if not self.logits_as_input:
# Get the logits for the next tokens.
logits = self._get_logits(hidden_states, lm_head, embedding_bias)
if logits is not None:
if self.soft_cap is not None:
logits = logits / self.soft_cap
logits = torch.tanh(logits)
logits = logits * self.soft_cap
if self.scale != 1.0:
logits *= self.scale
return logits
def forward(
self,
lm_head: VocabParallelEmbedding,
hidden_states: torch.Tensor,
) -> Optional[torch.Tensor]:
logits = self._get_logits_and_post_processing(lm_head, hidden_states,
None)
return logits
def _get_logits(
self,
hidden_states: torch.Tensor,
lm_head: VocabParallelEmbedding,
embedding_bias: Optional[torch.Tensor],
) -> Optional[torch.Tensor]:
logits = lm_head.quant_method.apply(lm_head,
hidden_states,
bias=embedding_bias)
if not self.skip_last_gather:
if self.use_gather:
# None may be returned for rank > 0
logits = tensor_model_parallel_gather(logits)
else:
# Gather is not supported for some devices such as TPUs.
# Use all-gather instead.
# NOTE(woosuk): Here, the outputs of every device should not be None
# because XLA requires strict SPMD among all devices. Every device
# should execute the same operations after gathering the logits.
logits = tensor_model_parallel_all_gather(logits)
# Remove paddings in vocab (if any).
if logits is not None:
logits = logits[..., :self.org_vocab_size]
return logits