optimum/graphcore/generation/on_device_generation.py (321 lines of code) (raw):

# Copyright (c) 2023 Graphcore Ltd. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from dataclasses import dataclass from typing import Optional, Union import poptorch import torch from transformers.generation.utils import LogitsProcessorList from transformers.modeling_outputs import ModelOutput from optimum.utils import logging logger = logging.get_logger(__name__) LARGE_NEGATIVE_CONST = -1e9 @dataclass class OnDeviceGenerationModelOutput(ModelOutput): generated_tokens: torch.Tensor = None done: torch.Tensor = None class OnDeviceGreedySearch(torch.nn.Module): def __init__( self, model: torch.nn.Module, batch_size: int, max_length: int, eos_token_id: int, pad_token_id: int, logits_processor: LogitsProcessorList, ): super().__init__() self.model = model self.batch_size = batch_size self.max_length = max_length self.eos_token_id = eos_token_id self.pad_token_id = pad_token_id self.logits_processor = logits_processor self.input_ids_mask = torch.zeros((batch_size, max_length), dtype=torch.int32) self.input_ids_mask[:, 0] = 1 # Poptorch buffers become constant if kept as int, so set them as float. self.generated_tokens_reset_value = torch.ones(batch_size, max_length, dtype=torch.float32) * pad_token_id self.register_buffer( "generated_tokens", self.generated_tokens_reset_value.clone(), persistent=False, ) def reset_state(self, begin_new_generation: torch.Tensor) -> None: self.generated_tokens.copy_( (1 - begin_new_generation) * self.generated_tokens + begin_new_generation * self.generated_tokens_reset_value.to(self.generated_tokens.device) ) def forward(self, input_ids: torch.Tensor, absolute_step: torch.Tensor, **kwargs) -> torch.Tensor: # Workaround for generic slice assignment self.generated_tokens[:, self.absolute_step] = tokens assert input_ids.shape[-1] == 1 input_ids_mask = self.input_ids_mask.to(input_ids.device) padded_input_ids = torch.nn.functional.pad(input_ids, (0, self.max_length - 1)).int() generated_tokens = input_ids_mask * padded_input_ids + (1 - input_ids_mask) * self.generated_tokens.int() model_input_ids = torch.index_select(generated_tokens, 1, absolute_step - 1) logits = self.model(decoder_input_ids=model_input_ids, **kwargs) if hasattr(logits, "logits"): logits = logits.logits logits = logits.squeeze(1).float() next_tokens_scores = self.logits_processor(generated_tokens, logits, absolute_step=absolute_step) next_tokens = torch.argmax(next_tokens_scores, dim=-1, keepdim=True).int() # If sentence has finished - previous token was EOS - set new tokens to EOS. sentence_eos = model_input_ids == self.eos_token_id sentence_eos &= (absolute_step - 1) != 0 sentence_eos = sentence_eos.int() next_tokens = sentence_eos * self.eos_token_id + (1 - sentence_eos) * next_tokens all_eos = torch.all(next_tokens == self.eos_token_id) self.generated_tokens.copy_( poptorch.dynamic_update(generated_tokens, next_tokens, 1, absolute_step, 1).float() ) return OnDeviceGenerationModelOutput(generated_tokens=next_tokens, done=all_eos) class OnDeviceBeamSearch(torch.nn.Module): """Based on https://github.com/huggingface/transformers/blob/main/src/transformers/generation/flax_utils.py::_beam_search.""" def __init__( self, model: torch.nn.Module, batch_size: int, num_beams: int, max_length: int, eos_token_id: int, pad_token_id: int, logits_processor: LogitsProcessorList, length_penalty: Optional[float] = 1.0, early_stopping: Optional[Union[bool, str]] = None, ): super().__init__() self.model = model self.batch_size = batch_size self.num_beams = num_beams self.max_length = max_length self.eos_token_id = eos_token_id self.pad_token_id = pad_token_id self.logits_processor = logits_processor self.length_penalty = length_penalty self.early_stopping = early_stopping self.input_ids_mask = torch.zeros((batch_size, num_beams, max_length), dtype=torch.int32) self.input_ids_mask[:, :, 0] = 1 # Poptorch buffers become constant if kept as int, so set them as float. self.sequences_reset_value = torch.ones(batch_size, num_beams, max_length, dtype=torch.float32) * pad_token_id self.running_sequences_reset_value = ( torch.ones(batch_size, num_beams, max_length, dtype=torch.float32) * pad_token_id ) self.log_probs_reset_value = torch.ones(batch_size, num_beams) * LARGE_NEGATIVE_CONST self.running_log_probs_reset_value = torch.tensor([[0.0] + [LARGE_NEGATIVE_CONST] * (num_beams - 1)]).repeat( batch_size, 1 ) self.is_finished_reset_value = torch.zeros(batch_size, num_beams, dtype=torch.float32) self._cached_beam_idx_reset_value = torch.arange(batch_size * num_beams, dtype=torch.float32) self.register_buffer("sequences", self.sequences_reset_value.clone(), persistent=False) self.register_buffer( "running_sequences", self.running_sequences_reset_value.clone(), persistent=False, ) self.register_buffer("log_probs", self.log_probs_reset_value.clone(), persistent=False) self.register_buffer( "running_log_probs", self.running_log_probs_reset_value.clone(), persistent=False, ) self.register_buffer("is_finished", self.is_finished_reset_value.clone(), persistent=False) # Holds the beam indices that will be used to permute the KV caches at next time step. self.register_buffer( "_cached_beam_idx", self._cached_beam_idx_reset_value.clone(), persistent=False, ) def reset_state(self, begin_new_generation: torch.Tensor) -> None: self.sequences.copy_( (1 - begin_new_generation) * self.sequences + begin_new_generation * self.sequences_reset_value.to(self.sequences.device) ) self.running_sequences.copy_( (1 - begin_new_generation) * self.running_sequences + begin_new_generation * self.running_sequences_reset_value.to(self.running_sequences.device) ) self.log_probs.copy_( (1 - begin_new_generation) * self.log_probs + begin_new_generation * self.log_probs_reset_value.to(self.log_probs.device) ) self.running_log_probs.copy_( (1 - begin_new_generation) * self.running_log_probs + begin_new_generation * self.running_log_probs_reset_value.to(self.running_log_probs.device) ) self.is_finished.copy_( (1 - begin_new_generation) * self.is_finished + begin_new_generation * self.is_finished_reset_value.to(self.is_finished.device) ) self._cached_beam_idx.copy_( (1 - begin_new_generation) * self._cached_beam_idx + begin_new_generation * self._cached_beam_idx_reset_value.to(self._cached_beam_idx.device) ) def _flatten_beam_dim(self, tensor, num_beams): return tensor.view(self.batch_size * num_beams, *tensor.shape[2:]) def _unflatten_beam_dim(self, tensor, num_beams): return tensor.view(self.batch_size, num_beams, *tensor.shape[1:]) def _gather_beams(self, tensor, beam_indices, batch_size, old_num_beams, new_num_beams): # Ones constraint we have is that index_select must use 1D indices, hence the additional reshaping. if tensor.shape[0] != self.batch_size * old_num_beams: tensor = self._flatten_beam_dim(tensor, old_num_beams) flat_batch_indices = torch.arange(batch_size * new_num_beams) // new_num_beams flat_batch_indices = flat_batch_indices * old_num_beams flat_beam_indices = beam_indices.view(-1) flat_beam_indices = flat_batch_indices + flat_beam_indices gathered_tensor = torch.index_select(tensor, 0, flat_beam_indices) return self._unflatten_beam_dim(gathered_tensor, new_num_beams) def forward(self, input_ids: torch.Tensor, absolute_step: torch.Tensor, **kwargs) -> torch.Tensor: # Workaround for generic slice assignment self.running_sequences[:, self.absolute_step] = tokens assert input_ids.shape[-1] == 1 input_ids_mask = self.input_ids_mask.to(input_ids.device) padded_input_ids = torch.nn.functional.pad(input_ids, (0, self.max_length - 1)) padded_input_ids = self._unflatten_beam_dim(padded_input_ids, self.num_beams).int() # Since we are constrained to keeping buffers in float, including ones holding tokens which are ints, # we do most of the processing as int and cast just before writing. sequences = self.sequences.int() running_sequences = input_ids_mask * padded_input_ids + (1 - input_ids_mask) * self.running_sequences.int() is_finished = self.is_finished.int() # 0. Check for termination of previous step not_max_length_yet = absolute_step < self.max_length worst_finished_score = torch.where( torch.any(is_finished, axis=1), torch.amin(self.log_probs, axis=1), torch.ones(self.batch_size) * LARGE_NEGATIVE_CONST, ) if self.early_stopping == "never" and self.length_penalty > 0.0: best_running_score = self.running_log_probs[:, 0] / (self.max_length**self.length_penalty) else: best_running_score = self.running_log_probs[:, 0] / (absolute_step**self.length_penalty) improvement_still_possible = torch.any(best_running_score > worst_finished_score) still_open_beam = ~(torch.all(is_finished) & (self.early_stopping is True)) continue_search = not_max_length_yet & still_open_beam & improvement_still_possible # 1. Return best beam for each batch and beam indices from previous step # Account for the edge-case where there are no finished sequences for a # particular batch item. If so, return running sequences for that batch item. none_finished = torch.any(is_finished, axis=1) return_sequences = torch.where(none_finished[:, None, None], sequences, running_sequences) return_sequences = return_sequences[:, 0] # 2. Get logits model_input_ids = torch.index_select(running_sequences, 2, absolute_step - 1) model_input_ids = self._flatten_beam_dim(model_input_ids, self.num_beams) logits = self.model(decoder_input_ids=model_input_ids, **kwargs) if hasattr(logits, "logits"): logits = logits.logits logits = logits.squeeze(1).float() # 3. Compute log probs vocab_size = logits.shape[-1] log_probs = torch.nn.functional.log_softmax(logits, dim=-1) log_probs = self.logits_processor(running_sequences, log_probs, absolute_step=absolute_step) log_probs = self._unflatten_beam_dim(log_probs, self.num_beams) log_probs = log_probs + self.running_log_probs.unsqueeze(-1) log_probs = log_probs.view(self.batch_size, self.num_beams * vocab_size) # 4. Retrieve top-2*K beams_to_keep = 2 * self.num_beams topk_log_probs, topk_indices = torch.topk(log_probs, k=beams_to_keep) topk_beam_indices = torch.div(topk_indices, vocab_size).int() topk_running_sequences = self._gather_beams( running_sequences, topk_beam_indices, self.batch_size, self.num_beams, beams_to_keep ) topk_ids = topk_indices % vocab_size topk_sequences = poptorch.dynamic_update( topk_running_sequences, topk_ids.unsqueeze(-1).int(), 2, absolute_step, 1 ) # 5. Check which sequences have ended did_topk_just_finish = topk_ids == self.eos_token_id running_topk_log_probs = topk_log_probs + did_topk_just_finish * LARGE_NEGATIVE_CONST # 6. Get running sequences scores for next next_topk_indices = torch.topk(running_topk_log_probs, k=self.num_beams)[1] next_running_sequences = self._gather_beams( topk_sequences, next_topk_indices, self.batch_size, beams_to_keep, self.num_beams ) next_running_log_probs = self._gather_beams( running_topk_log_probs, next_topk_indices, self.batch_size, beams_to_keep, self.num_beams ) # 7. Process topk logits topk_log_probs = topk_log_probs / (absolute_step**self.length_penalty) beams_in_batch_are_full = is_finished.all(axis=-1, keepdims=True).repeat(1, did_topk_just_finish.shape[-1]) & ( self.early_stopping is True ) add_penalty = ~did_topk_just_finish | beams_in_batch_are_full topk_log_probs += add_penalty * LARGE_NEGATIVE_CONST # 8. Get scores, sequences, is sentence finished for next. merged_sequences = torch.cat([sequences, topk_sequences], axis=1) merged_log_probs = torch.cat([self.log_probs, topk_log_probs], axis=1) merged_is_finished = torch.cat([is_finished, did_topk_just_finish], axis=1) topk_merged_indices = torch.topk(merged_log_probs, k=self.num_beams)[1] next_sequences = self._gather_beams( merged_sequences, topk_merged_indices, self.batch_size, 3 * self.num_beams, self.num_beams ) next_log_probs = self._gather_beams( merged_log_probs, topk_merged_indices, self.batch_size, 3 * self.num_beams, self.num_beams ) next_is_finished = self._gather_beams( merged_is_finished, topk_merged_indices, self.batch_size, 3 * self.num_beams, self.num_beams ) # 9. Determine the top k beam indices from the original set of all beams. next_running_indices = self._gather_beams( topk_beam_indices, next_topk_indices, self.batch_size, 2 * self.num_beams, self.num_beams ) flat_batch_indices = torch.arange(self.batch_size * self.num_beams) // self.num_beams flat_batch_indices = flat_batch_indices * self.num_beams beam_indices = self._flatten_beam_dim(next_running_indices, self.num_beams) beam_indices = beam_indices + flat_batch_indices self.sequences.copy_(next_sequences.float()) self.running_sequences.copy_(next_running_sequences.float()) self.log_probs.copy_(next_log_probs) self.running_log_probs.copy_(next_running_log_probs) self.is_finished.copy_(next_is_finished.float()) self._cached_beam_idx.copy_(beam_indices.float()) return OnDeviceGenerationModelOutput( generated_tokens=return_sequences, done=~continue_search, ) class OnDeviceGenerationModel(torch.nn.Module): """ A wrapper around a user generation model that effectively runs the entire generation loop on device for a selected number of steps before returning to host. Each generated token is stored in appropriate buffers on device. We currently support greedy search and beam search. For beam search, the additional state is similarly stored in buffers on device. Let B = batch size * num beams, C = context length, S = max length. The input token tensor is of shape [B, C]. To do D on-device generation steps at a time, the device iterations should be set to D, for example D=16. If using this class outside of the `IPUGenerationMixin`, the user must ensure generation does not exceed the max sequence length of S. Current limitations: a) context length is restricted to C=1; b) user generation model must have KV caching enabled; c) poptorch execution strategy must be poptorch.ShardedExecution. """ def __init__( self, model: torch.nn.Module, batch_size: int, max_length: int, eos_token_id: int, pad_token_id: int, logits_processor: LogitsProcessorList, num_beams: Optional[int] = 1, use_cache: Optional[bool] = True, length_penalty: Optional[float] = 1.0, early_stopping: Optional[bool] = False, ): super().__init__() if not use_cache: raise NotImplementedError("On device generation assumes `use_cache=True`.") if isinstance(eos_token_id, list): if len(eos_token_id) > 1: raise ValueError("Multiple EOS tokens are not yet supported for on-device generation.") eos_token_id = eos_token_id[0] self.max_length = max_length self.context_length = 1 if num_beams == 1: self.generation_strategy = OnDeviceGreedySearch( model, batch_size=batch_size, max_length=max_length, eos_token_id=eos_token_id, pad_token_id=pad_token_id, logits_processor=logits_processor, ) else: self.generation_strategy = OnDeviceBeamSearch( model, batch_size=batch_size, num_beams=num_beams, max_length=max_length, eos_token_id=eos_token_id, pad_token_id=pad_token_id, logits_processor=logits_processor, length_penalty=length_penalty, early_stopping=early_stopping, ) # Poptorch buffers become constant if kept as int, so set them as float. self.register_buffer("_generation_step", torch.tensor([0], dtype=torch.int32), persistent=False) def _reset_generation_step(self, begin_new_generation: torch.Tensor) -> None: generation_step = (1 - begin_new_generation) * self._generation_step self._generation_step.copy_(generation_step) def forward(self, **kwargs): input_ids_key = "decoder_input_ids" input_ids = kwargs.pop(input_ids_key, None) if input_ids is None: input_ids_key = "input_ids" input_ids = kwargs.pop(input_ids_key, None) if input_ids is None: raise ValueError( "The on device generation model was called with kwargs that are missing both `decoder_input_ids` " "and `input_ids`. Please provide one of these as inputs (default is `decoder_input_ids`)." ) if input_ids.shape[-1] > 1: raise ValueError("Context length (input_ids.shape[-1]) > 1 is not supported yet.") if generation_step := kwargs.pop("generation_step", None) is not None: self._generation_step.copy_(generation_step) absolute_step = self._generation_step + self.context_length # Make sure generation_step does not go out of bounds. self._generation_step.copy_(self._generation_step % self.max_length) # Reset on-device state buffers when starting generation anew. begin_new_generation = (self._generation_step == 0).int() self.generation_strategy.reset_state(begin_new_generation) self._reset_generation_step(begin_new_generation) outputs = self.generation_strategy(input_ids, absolute_step, **kwargs) if not isinstance(outputs, OnDeviceGenerationModelOutput): raise TypeError( f"Unexpected type {type(outputs)} returned from {self.generation_strategy.__class__.__name__}." ) self._generation_step.copy_(self._generation_step + 1) return outputs