# Copyright (c) 2023 Graphcore Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Optional, Union

import poptorch
import torch
from transformers.generation.utils import LogitsProcessorList
from transformers.modeling_outputs import ModelOutput

from optimum.utils import logging


logger = logging.get_logger(__name__)


LARGE_NEGATIVE_CONST = -1e9


@dataclass
class OnDeviceGenerationModelOutput(ModelOutput):
    generated_tokens: torch.Tensor = None
    done: torch.Tensor = None


class OnDeviceGreedySearch(torch.nn.Module):
    def __init__(
        self,
        model: torch.nn.Module,
        batch_size: int,
        max_length: int,
        eos_token_id: int,
        pad_token_id: int,
        logits_processor: LogitsProcessorList,
    ):
        super().__init__()
        self.model = model
        self.batch_size = batch_size
        self.max_length = max_length
        self.eos_token_id = eos_token_id
        self.pad_token_id = pad_token_id
        self.logits_processor = logits_processor

        self.input_ids_mask = torch.zeros((batch_size, max_length), dtype=torch.int32)
        self.input_ids_mask[:, 0] = 1

        # Poptorch buffers become constant if kept as int, so set them as float.
        self.generated_tokens_reset_value = torch.ones(batch_size, max_length, dtype=torch.float32) * pad_token_id
        self.register_buffer(
            "generated_tokens",
            self.generated_tokens_reset_value.clone(),
            persistent=False,
        )

    def reset_state(self, begin_new_generation: torch.Tensor) -> None:
        self.generated_tokens.copy_(
            (1 - begin_new_generation) * self.generated_tokens
            + begin_new_generation * self.generated_tokens_reset_value.to(self.generated_tokens.device)
        )

    def forward(self, input_ids: torch.Tensor, absolute_step: torch.Tensor, **kwargs) -> torch.Tensor:
        # Workaround for generic slice assignment self.generated_tokens[:, self.absolute_step] = tokens
        assert input_ids.shape[-1] == 1
        input_ids_mask = self.input_ids_mask.to(input_ids.device)
        padded_input_ids = torch.nn.functional.pad(input_ids, (0, self.max_length - 1)).int()
        generated_tokens = input_ids_mask * padded_input_ids + (1 - input_ids_mask) * self.generated_tokens.int()

        model_input_ids = torch.index_select(generated_tokens, 1, absolute_step - 1)
        logits = self.model(decoder_input_ids=model_input_ids, **kwargs)
        if hasattr(logits, "logits"):
            logits = logits.logits
        logits = logits.squeeze(1).float()
        next_tokens_scores = self.logits_processor(generated_tokens, logits, absolute_step=absolute_step)

        next_tokens = torch.argmax(next_tokens_scores, dim=-1, keepdim=True).int()
        # If sentence has finished - previous token was EOS - set new tokens to EOS.
        sentence_eos = model_input_ids == self.eos_token_id
        sentence_eos &= (absolute_step - 1) != 0
        sentence_eos = sentence_eos.int()
        next_tokens = sentence_eos * self.eos_token_id + (1 - sentence_eos) * next_tokens
        all_eos = torch.all(next_tokens == self.eos_token_id)

        self.generated_tokens.copy_(
            poptorch.dynamic_update(generated_tokens, next_tokens, 1, absolute_step, 1).float()
        )

        return OnDeviceGenerationModelOutput(generated_tokens=next_tokens, done=all_eos)


class OnDeviceBeamSearch(torch.nn.Module):
    """Based on https://github.com/huggingface/transformers/blob/main/src/transformers/generation/flax_utils.py::_beam_search."""

    def __init__(
        self,
        model: torch.nn.Module,
        batch_size: int,
        num_beams: int,
        max_length: int,
        eos_token_id: int,
        pad_token_id: int,
        logits_processor: LogitsProcessorList,
        length_penalty: Optional[float] = 1.0,
        early_stopping: Optional[Union[bool, str]] = None,
    ):
        super().__init__()
        self.model = model
        self.batch_size = batch_size
        self.num_beams = num_beams
        self.max_length = max_length
        self.eos_token_id = eos_token_id
        self.pad_token_id = pad_token_id
        self.logits_processor = logits_processor
        self.length_penalty = length_penalty
        self.early_stopping = early_stopping

        self.input_ids_mask = torch.zeros((batch_size, num_beams, max_length), dtype=torch.int32)
        self.input_ids_mask[:, :, 0] = 1

        # Poptorch buffers become constant if kept as int, so set them as float.
        self.sequences_reset_value = torch.ones(batch_size, num_beams, max_length, dtype=torch.float32) * pad_token_id
        self.running_sequences_reset_value = (
            torch.ones(batch_size, num_beams, max_length, dtype=torch.float32) * pad_token_id
        )
        self.log_probs_reset_value = torch.ones(batch_size, num_beams) * LARGE_NEGATIVE_CONST
        self.running_log_probs_reset_value = torch.tensor([[0.0] + [LARGE_NEGATIVE_CONST] * (num_beams - 1)]).repeat(
            batch_size, 1
        )
        self.is_finished_reset_value = torch.zeros(batch_size, num_beams, dtype=torch.float32)
        self._cached_beam_idx_reset_value = torch.arange(batch_size * num_beams, dtype=torch.float32)
        self.register_buffer("sequences", self.sequences_reset_value.clone(), persistent=False)
        self.register_buffer(
            "running_sequences",
            self.running_sequences_reset_value.clone(),
            persistent=False,
        )
        self.register_buffer("log_probs", self.log_probs_reset_value.clone(), persistent=False)
        self.register_buffer(
            "running_log_probs",
            self.running_log_probs_reset_value.clone(),
            persistent=False,
        )
        self.register_buffer("is_finished", self.is_finished_reset_value.clone(), persistent=False)
        # Holds the beam indices that will be used to permute the KV caches at next time step.
        self.register_buffer(
            "_cached_beam_idx",
            self._cached_beam_idx_reset_value.clone(),
            persistent=False,
        )

    def reset_state(self, begin_new_generation: torch.Tensor) -> None:
        self.sequences.copy_(
            (1 - begin_new_generation) * self.sequences
            + begin_new_generation * self.sequences_reset_value.to(self.sequences.device)
        )
        self.running_sequences.copy_(
            (1 - begin_new_generation) * self.running_sequences
            + begin_new_generation * self.running_sequences_reset_value.to(self.running_sequences.device)
        )
        self.log_probs.copy_(
            (1 - begin_new_generation) * self.log_probs
            + begin_new_generation * self.log_probs_reset_value.to(self.log_probs.device)
        )
        self.running_log_probs.copy_(
            (1 - begin_new_generation) * self.running_log_probs
            + begin_new_generation * self.running_log_probs_reset_value.to(self.running_log_probs.device)
        )
        self.is_finished.copy_(
            (1 - begin_new_generation) * self.is_finished
            + begin_new_generation * self.is_finished_reset_value.to(self.is_finished.device)
        )
        self._cached_beam_idx.copy_(
            (1 - begin_new_generation) * self._cached_beam_idx
            + begin_new_generation * self._cached_beam_idx_reset_value.to(self._cached_beam_idx.device)
        )

    def _flatten_beam_dim(self, tensor, num_beams):
        return tensor.view(self.batch_size * num_beams, *tensor.shape[2:])

    def _unflatten_beam_dim(self, tensor, num_beams):
        return tensor.view(self.batch_size, num_beams, *tensor.shape[1:])

    def _gather_beams(self, tensor, beam_indices, batch_size, old_num_beams, new_num_beams):
        # Ones constraint we have is that index_select must use 1D indices, hence the additional reshaping.
        if tensor.shape[0] != self.batch_size * old_num_beams:
            tensor = self._flatten_beam_dim(tensor, old_num_beams)
        flat_batch_indices = torch.arange(batch_size * new_num_beams) // new_num_beams
        flat_batch_indices = flat_batch_indices * old_num_beams
        flat_beam_indices = beam_indices.view(-1)
        flat_beam_indices = flat_batch_indices + flat_beam_indices
        gathered_tensor = torch.index_select(tensor, 0, flat_beam_indices)
        return self._unflatten_beam_dim(gathered_tensor, new_num_beams)

    def forward(self, input_ids: torch.Tensor, absolute_step: torch.Tensor, **kwargs) -> torch.Tensor:
        # Workaround for generic slice assignment self.running_sequences[:, self.absolute_step] = tokens
        assert input_ids.shape[-1] == 1
        input_ids_mask = self.input_ids_mask.to(input_ids.device)
        padded_input_ids = torch.nn.functional.pad(input_ids, (0, self.max_length - 1))
        padded_input_ids = self._unflatten_beam_dim(padded_input_ids, self.num_beams).int()

        # Since we are constrained to keeping buffers in float, including ones holding tokens which are ints,
        # we do most of the processing as int and cast just before writing.
        sequences = self.sequences.int()
        running_sequences = input_ids_mask * padded_input_ids + (1 - input_ids_mask) * self.running_sequences.int()
        is_finished = self.is_finished.int()

        # 0. Check for termination of previous step
        not_max_length_yet = absolute_step < self.max_length

        worst_finished_score = torch.where(
            torch.any(is_finished, axis=1),
            torch.amin(self.log_probs, axis=1),
            torch.ones(self.batch_size) * LARGE_NEGATIVE_CONST,
        )
        if self.early_stopping == "never" and self.length_penalty > 0.0:
            best_running_score = self.running_log_probs[:, 0] / (self.max_length**self.length_penalty)
        else:
            best_running_score = self.running_log_probs[:, 0] / (absolute_step**self.length_penalty)
        improvement_still_possible = torch.any(best_running_score > worst_finished_score)

        still_open_beam = ~(torch.all(is_finished) & (self.early_stopping is True))

        continue_search = not_max_length_yet & still_open_beam & improvement_still_possible

        # 1. Return best beam for each batch and beam indices from previous step
        # Account for the edge-case where there are no finished sequences for a
        # particular batch item. If so, return running sequences for that batch item.
        none_finished = torch.any(is_finished, axis=1)
        return_sequences = torch.where(none_finished[:, None, None], sequences, running_sequences)
        return_sequences = return_sequences[:, 0]

        # 2. Get logits
        model_input_ids = torch.index_select(running_sequences, 2, absolute_step - 1)
        model_input_ids = self._flatten_beam_dim(model_input_ids, self.num_beams)

        logits = self.model(decoder_input_ids=model_input_ids, **kwargs)
        if hasattr(logits, "logits"):
            logits = logits.logits
        logits = logits.squeeze(1).float()

        # 3. Compute log probs
        vocab_size = logits.shape[-1]
        log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
        log_probs = self.logits_processor(running_sequences, log_probs, absolute_step=absolute_step)
        log_probs = self._unflatten_beam_dim(log_probs, self.num_beams)
        log_probs = log_probs + self.running_log_probs.unsqueeze(-1)
        log_probs = log_probs.view(self.batch_size, self.num_beams * vocab_size)

        # 4. Retrieve top-2*K
        beams_to_keep = 2 * self.num_beams
        topk_log_probs, topk_indices = torch.topk(log_probs, k=beams_to_keep)
        topk_beam_indices = torch.div(topk_indices, vocab_size).int()
        topk_running_sequences = self._gather_beams(
            running_sequences, topk_beam_indices, self.batch_size, self.num_beams, beams_to_keep
        )
        topk_ids = topk_indices % vocab_size
        topk_sequences = poptorch.dynamic_update(
            topk_running_sequences, topk_ids.unsqueeze(-1).int(), 2, absolute_step, 1
        )

        # 5. Check which sequences have ended
        did_topk_just_finish = topk_ids == self.eos_token_id
        running_topk_log_probs = topk_log_probs + did_topk_just_finish * LARGE_NEGATIVE_CONST

        # 6. Get running sequences scores for next
        next_topk_indices = torch.topk(running_topk_log_probs, k=self.num_beams)[1]

        next_running_sequences = self._gather_beams(
            topk_sequences, next_topk_indices, self.batch_size, beams_to_keep, self.num_beams
        )
        next_running_log_probs = self._gather_beams(
            running_topk_log_probs, next_topk_indices, self.batch_size, beams_to_keep, self.num_beams
        )

        # 7. Process topk logits
        topk_log_probs = topk_log_probs / (absolute_step**self.length_penalty)
        beams_in_batch_are_full = is_finished.all(axis=-1, keepdims=True).repeat(1, did_topk_just_finish.shape[-1]) & (
            self.early_stopping is True
        )
        add_penalty = ~did_topk_just_finish | beams_in_batch_are_full
        topk_log_probs += add_penalty * LARGE_NEGATIVE_CONST

        # 8. Get scores, sequences, is sentence finished for next.
        merged_sequences = torch.cat([sequences, topk_sequences], axis=1)
        merged_log_probs = torch.cat([self.log_probs, topk_log_probs], axis=1)
        merged_is_finished = torch.cat([is_finished, did_topk_just_finish], axis=1)
        topk_merged_indices = torch.topk(merged_log_probs, k=self.num_beams)[1]
        next_sequences = self._gather_beams(
            merged_sequences, topk_merged_indices, self.batch_size, 3 * self.num_beams, self.num_beams
        )
        next_log_probs = self._gather_beams(
            merged_log_probs, topk_merged_indices, self.batch_size, 3 * self.num_beams, self.num_beams
        )
        next_is_finished = self._gather_beams(
            merged_is_finished, topk_merged_indices, self.batch_size, 3 * self.num_beams, self.num_beams
        )

        # 9. Determine the top k beam indices from the original set of all beams.
        next_running_indices = self._gather_beams(
            topk_beam_indices, next_topk_indices, self.batch_size, 2 * self.num_beams, self.num_beams
        )

        flat_batch_indices = torch.arange(self.batch_size * self.num_beams) // self.num_beams
        flat_batch_indices = flat_batch_indices * self.num_beams
        beam_indices = self._flatten_beam_dim(next_running_indices, self.num_beams)
        beam_indices = beam_indices + flat_batch_indices

        self.sequences.copy_(next_sequences.float())
        self.running_sequences.copy_(next_running_sequences.float())
        self.log_probs.copy_(next_log_probs)
        self.running_log_probs.copy_(next_running_log_probs)
        self.is_finished.copy_(next_is_finished.float())
        self._cached_beam_idx.copy_(beam_indices.float())

        return OnDeviceGenerationModelOutput(
            generated_tokens=return_sequences,
            done=~continue_search,
        )


class OnDeviceGenerationModel(torch.nn.Module):
    """
    A wrapper around a user generation model that effectively runs the entire generation loop
    on device for a selected number of steps before returning to host. Each generated token is
    stored in appropriate buffers on device.

    We currently support greedy search and beam search. For beam search, the additional state is similarly
    stored in buffers on device.

    Let B = batch size * num beams, C = context length, S = max length.
    The input token tensor is of shape [B, C]. To do D on-device generation steps at a time,
    the device iterations should be set to D, for example D=16.

    If using this class outside of the `IPUGenerationMixin`, the user must ensure generation does not
    exceed the max sequence length of S.

    Current limitations:
    a) context length is restricted to C=1;
    b) user generation model must have KV caching enabled;
    c) poptorch execution strategy must be poptorch.ShardedExecution.
    """

    def __init__(
        self,
        model: torch.nn.Module,
        batch_size: int,
        max_length: int,
        eos_token_id: int,
        pad_token_id: int,
        logits_processor: LogitsProcessorList,
        num_beams: Optional[int] = 1,
        use_cache: Optional[bool] = True,
        length_penalty: Optional[float] = 1.0,
        early_stopping: Optional[bool] = False,
    ):
        super().__init__()

        if not use_cache:
            raise NotImplementedError("On device generation assumes `use_cache=True`.")

        if isinstance(eos_token_id, list):
            if len(eos_token_id) > 1:
                raise ValueError("Multiple EOS tokens are not yet supported for on-device generation.")
            eos_token_id = eos_token_id[0]

        self.max_length = max_length
        self.context_length = 1

        if num_beams == 1:
            self.generation_strategy = OnDeviceGreedySearch(
                model,
                batch_size=batch_size,
                max_length=max_length,
                eos_token_id=eos_token_id,
                pad_token_id=pad_token_id,
                logits_processor=logits_processor,
            )
        else:
            self.generation_strategy = OnDeviceBeamSearch(
                model,
                batch_size=batch_size,
                num_beams=num_beams,
                max_length=max_length,
                eos_token_id=eos_token_id,
                pad_token_id=pad_token_id,
                logits_processor=logits_processor,
                length_penalty=length_penalty,
                early_stopping=early_stopping,
            )

        # Poptorch buffers become constant if kept as int, so set them as float.
        self.register_buffer("_generation_step", torch.tensor([0], dtype=torch.int32), persistent=False)

    def _reset_generation_step(self, begin_new_generation: torch.Tensor) -> None:
        generation_step = (1 - begin_new_generation) * self._generation_step
        self._generation_step.copy_(generation_step)

    def forward(self, **kwargs):
        input_ids_key = "decoder_input_ids"
        input_ids = kwargs.pop(input_ids_key, None)
        if input_ids is None:
            input_ids_key = "input_ids"
            input_ids = kwargs.pop(input_ids_key, None)
            if input_ids is None:
                raise ValueError(
                    "The on device generation model was called with kwargs that are missing both `decoder_input_ids` "
                    "and `input_ids`. Please provide one of these as inputs (default is `decoder_input_ids`)."
                )
        if input_ids.shape[-1] > 1:
            raise ValueError("Context length (input_ids.shape[-1]) > 1 is not supported yet.")

        if generation_step := kwargs.pop("generation_step", None) is not None:
            self._generation_step.copy_(generation_step)

        absolute_step = self._generation_step + self.context_length

        # Make sure generation_step does not go out of bounds.
        self._generation_step.copy_(self._generation_step % self.max_length)

        # Reset on-device state buffers when starting generation anew.
        begin_new_generation = (self._generation_step == 0).int()
        self.generation_strategy.reset_state(begin_new_generation)
        self._reset_generation_step(begin_new_generation)

        outputs = self.generation_strategy(input_ids, absolute_step, **kwargs)
        if not isinstance(outputs, OnDeviceGenerationModelOutput):
            raise TypeError(
                f"Unexpected type {type(outputs)} returned from {self.generation_strategy.__class__.__name__}."
            )

        self._generation_step.copy_(self._generation_step + 1)

        return outputs
