#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import queue
from asyncio import Queue as AsyncQueue, QueueFull as AsyncQueueFull
from collections import deque

from skywalking.protocol.common.Command_pb2 import Commands, Command

from skywalking.command.base_command import BaseCommand
from skywalking.command.executors import noop_command_executor_instance
from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor
from skywalking.command.profile_task_command import ProfileTaskCommand
from skywalking.loggings import logger


class CommandService:

    def __init__(self):
        self._commands = queue.Queue()  # type: queue.Queue
        # don't execute same command twice
        self._command_serial_number_cache = CommandSerialNumberCache()

    def dispatch(self):
        while True:
            # block until a command is available
            command = self._commands.get()  # type: BaseCommand
            if not self.__is_command_executed(command):
                command_executor_service.execute(command)
                self._command_serial_number_cache.add(command.serial_number)

    def __is_command_executed(self, command: BaseCommand):
        return self._command_serial_number_cache.contains(command.serial_number)

    def receive_command(self, commands: Commands):
        for command in commands.commands:
            try:
                base_command = CommandDeserializer.deserialize(command)
                logger.debug('received command [{%s} {%s}]', base_command.command, base_command.serial_number)

                if self.__is_command_executed(base_command):
                    logger.warning('command[{%s}] is executed, ignored.', base_command.command)
                    continue

                try:
                    self._commands.put(base_command)
                except queue.Full:
                    logger.warning('command[{%s}, {%s}] cannot add to command list. because the command list is full.',
                                   base_command.command, base_command.serial_number)
            except UnsupportedCommandException as e:
                logger.warning('received unsupported command[{%s}].', e.command.command)


class CommandServiceAsync:

    def __init__(self):
        # don't execute same command twice
        self._command_serial_number_cache = CommandSerialNumberCache()

    async def dispatch(self):
        self._commands = AsyncQueue()  # type: AsyncQueue
        while True:
            # block until a command is available
            command = await self._commands.get()  # type: BaseCommand
            if not self.__is_command_executed(command):
                command_executor_service.execute(command)
                self._command_serial_number_cache.add(command.serial_number)

    def __is_command_executed(self, command: BaseCommand):
        return self._command_serial_number_cache.contains(command.serial_number)

    def receive_command(self, commands: Commands):
        for command in commands.commands:
            try:
                base_command = CommandDeserializer.deserialize(command)
                logger.debug('received command [{%s} {%s}]', base_command.command, base_command.serial_number)

                if self.__is_command_executed(base_command):
                    logger.warning('command[{%s}] is executed, ignored.', base_command.command)
                    continue

                try:
                    self._commands.put_nowait(base_command)
                except AsyncQueueFull:
                    logger.warning(
                        'command[{%s}, {%s}] cannot add to command list. because the command list is full.',
                        base_command.command,
                        base_command.serial_number)
            except UnsupportedCommandException as e:
                logger.warning('received unsupported command[{%s}].', e.command.command)


class CommandSerialNumberCache:

    def __init__(self, maxlen=64):
        self.queue = deque(maxlen=maxlen)

    def add(self, number: str):
        # Once a bounded length deque is full, when new items are added,
        # a corresponding number of items are discarded from the opposite end.
        self.queue.append(number)

    def contains(self, number: str) -> bool:
        try:
            _ = self.queue.index(number)
            return True
        except ValueError:
            return False


class CommandExecutorService:
    """
    route commands to appropriate executor
    """

    def __init__(self):
        self.__command_executor_map = {ProfileTaskCommand.NAME: ProfileTaskCommandExecutor()}

    def execute(self, command: BaseCommand):
        self.__executor_for_command(command).execute(command)

    def __executor_for_command(self, command: BaseCommand):
        executor = self.__command_executor_map.get(command.command)
        if not executor:
            return noop_command_executor_instance
        return executor


class CommandDeserializer:

    @staticmethod
    def deserialize(command: Command) -> BaseCommand:
        command_name = command.command

        if ProfileTaskCommand.NAME == command_name:
            return ProfileTaskCommand.deserialize(command)
        else:
            raise UnsupportedCommandException(command)


class UnsupportedCommandException(Exception):

    def __init__(self, command):
        self.command = command


# init
command_executor_service = CommandExecutorService()
