#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import copy
import logging
import os
import queue
import random
import time

from kubernetes import client as kube_client
from kubernetes.client import CoreV1Api
from kubernetes.client.rest import ApiException as K8SApiException

from graphscope.config import Config
from graphscope.deploy.kubernetes.resource_builder import CoordinatorDeployment
from graphscope.deploy.kubernetes.resource_builder import ResourceBuilder
from graphscope.deploy.kubernetes.utils import KubernetesPodWatcher
from graphscope.deploy.kubernetes.utils import delete_kubernetes_object
from graphscope.deploy.kubernetes.utils import get_service_endpoints
from graphscope.deploy.kubernetes.utils import try_to_read_namespace_from_context
from graphscope.deploy.kubernetes.utils import wait_for_deployment_complete
from graphscope.deploy.launcher import Launcher
from graphscope.framework.utils import random_string
from graphscope.version import __version__

logger = logging.getLogger("graphscope")


class KubernetesClusterLauncher(Launcher):
    """Class for setting up GraphScope instance on kubernetes cluster."""

    _coordinator_name_prefix = "coordinator-"

    _role_name_prefix = "gs-reader-"
    _role_binding_name_prefix = f"{_role_name_prefix}binding-"
    _cluster_role_name_prefix = "gs-cluster-reader-"
    _cluster_role_binding_name_prefix = f"{_cluster_role_name_prefix}binding-"

    _url_pattern = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"  # noqa: E501
    _endpoint_pattern = r"(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*"

    _coordinator_container_name = "coordinator"
    _coordinator_service_port_name = "coordinator"

    def __init__(
        self,
        config: Config,
        api_client: kube_client.ApiClient,
    ):
        super().__init__()
        self._config = copy.deepcopy(config)
        self._api_client = api_client
        self._core_api = kube_client.CoreV1Api(api_client)
        self._app_api = kube_client.AppsV1Api(api_client)
        self._rbac_api = kube_client.RbacAuthorizationV1Api(api_client)

        self._service_type = config.kubernetes_launcher.service_type
        self._registry = config.kubernetes_launcher.image.registry
        self._repository = config.kubernetes_launcher.image.repository
        self._tag = config.kubernetes_launcher.image.tag
        self._image_pull_policy = config.kubernetes_launcher.image.pull_policy
        self._image_pull_secrets = config.kubernetes_launcher.image.pull_secrets

        self._instance_id = config.session.instance_id
        self._role_name = self._role_name_prefix + self._instance_id
        self._role_binding_name = self._role_binding_name_prefix + self._instance_id
        self._cluster_role_name = ""
        self._cluster_role_binding_name = ""

        # all resource created inside namespace
        self._resource_object = []

        self._coordinator_name = self._coordinator_name_prefix + self._instance_id
        self._coordinator_service_name = self._coordinator_name

        self._config.coordinator.deployment_name = self._coordinator_name

        self._namespace = config.kubernetes_launcher.namespace
        if self._namespace is None:
            self._namespace = try_to_read_namespace_from_context()
            # Doesn't have any namespace info in kube context.
            if self._namespace is None:
                self._namespace = self._get_free_namespace()
            self._config.kubernetes_launcher.namespace = self._namespace
        self._closed = False

        # pods watcher
        self._coordinator_pods_watcher = None
        self._logs = []

        self._labels = {
            "app.kubernetes.io/name": "graphscope",
            "app.kubernetes.io/instance": self._instance_id,
            "app.kubernetes.io/version": __version__,
            "app.kubernetes.io/component": "coordinator",
        }

    def __del__(self):
        self.stop()

    def poll(self):
        """Check the coordinator pod status, 0 for success."""
        return 0

    def get_namespace(self):
        """Get kubernetes namespace which graphscope instance running on.

        Returns:
            str: Kubernetes namespace.
        """
        return self._namespace

    def type(self):
        return "k8s"

    def _get_free_namespace(self):
        while True:
            namespace = "gs-" + random_string(6)
            if not self._namespace_exist(namespace):
                return namespace

    def _resource_exist(self, func, *args):
        try:
            func(*args)
        except K8SApiException as e:
            if e.status != 404:  # Not found
                raise
            return False
        return True

    def _namespace_exist(self, namespace):
        return self._resource_exist(self._core_api.read_namespace, namespace)

    def _role_exist(self, namespace, role):
        return self._resource_exist(
            self._rbac_api.read_namespaced_role, role, namespace
        )

    def _cluster_role_exist(self, cluster_role):
        return self._resource_exist(self._rbac_api.read_cluster_role, cluster_role)

    def _role_binding_exist(self, namespace, role_binding):
        return self._resource_exist(
            self._rbac_api.read_namespaced_role_binding, role_binding, namespace
        )

    def _cluster_role_binding_exist(self, cluster_role_binding):
        return self._resource_exist(
            self._rbac_api.read_cluster_role_binding, cluster_role_binding
        )

    def _create_namespace(self):
        if not self._namespace_exist(self._namespace):
            namespace = ResourceBuilder.get_namespace(self._namespace)
            self._core_api.create_namespace(namespace)
            self._config.kubernetes_launcher.delete_namespace = True

    def _create_role_and_binding(self):
        self._cluster_role_name = self._cluster_role_name_prefix + self._namespace
        self._cluster_role_binding_name = (
            self._cluster_role_binding_name_prefix + self._namespace
        )
        # create a role and bind to default service account.
        targets = []
        if not self._role_exist(namespace=self._namespace, role=self._role_name):
            role = ResourceBuilder.get_role(
                name=self._role_name,
                namespace=self._namespace,
                api_groups=",apps,extensions,kubeflow.org",  # The leading comma is necessary, represents for core api group.
                resources="configmaps,deployments,deployments/status,statefulsets,statefulsets/status,endpoints,events,pods,pods/log,pods/exec,pods/status,services,replicasets,pytorchjobs",  # noqa: E501
                verbs="create,delete,get,update,watch,list",
                labels=self._labels,
            )
            ret = self._rbac_api.create_namespaced_role(self._namespace, role)
            targets.append(ret)

        if not self._role_binding_exist(self._namespace, self._role_binding_name):
            role_binding = ResourceBuilder.get_role_binding(
                name=self._role_binding_name,
                namespace=self._namespace,
                role_name=self._role_name,
                service_account_name="default",
                labels=self._labels,
            )
            ret = self._rbac_api.create_namespaced_role_binding(
                self._namespace, role_binding
            )
            targets.append(ret)

        if self._config.kubernetes_launcher.delete_namespace:
            # Create clusterRole to delete namespace.
            if not self._cluster_role_exist(cluster_role=self._cluster_role_name):
                cluster_role = ResourceBuilder.get_cluster_role(
                    name=self._cluster_role_name,
                    api_groups="apps",
                    resources="namespaces",
                    verbs="create,delete,get,update,watch,list",
                    labels=self._labels,
                )
                ret = self._rbac_api.create_cluster_role(cluster_role)
                targets.append(ret)

            if not self._cluster_role_binding_exist(
                cluster_role_binding=self._cluster_role_binding_name
            ):
                cluster_role_binding = ResourceBuilder.get_cluster_role_binding(
                    name=self._cluster_role_binding_name,
                    namespace=self._namespace,
                    role_name=self._cluster_role_name,
                    service_account_name="default",
                    labels=self._labels,
                )
                ret = self._rbac_api.create_cluster_role_binding(cluster_role_binding)
                targets.append(ret)
        self._resource_object.extend(targets)

    def _create_coordinator(self):
        logger.info("Launching coordinator...")
        targets = []

        env = {
            "PYTHONUNBUFFERED": "TRUE",
            "KUBE_NAMESPACE": self._namespace,
            "INSTANCE_ID": self._instance_id,
            "GREMLIN_EXPOSE": self._service_type,
        }
        if "KUBE_API_ADDRESS" in os.environ:
            env["KUBE_API_ADDRESS"] = os.environ["KUBE_API_ADDRESS"]
        if self._registry:
            image_prefix = f"{self._registry}/{self._repository}"
        else:
            image_prefix = self._repository
        image = f"{image_prefix}/coordinator:{self._tag}"
        args = self._get_coordinator_args()

        image_pull_policy = self._config.kubernetes_launcher.image.pull_policy
        host_network = "ENABLE_HOST_NETWORK" in os.environ
        node_selector = self._config.coordinator.node_selector
        port = self._config.coordinator.service_port

        coordinator = CoordinatorDeployment(
            namespace=self._namespace,
            name=self._coordinator_name,
            image=image,
            args=args,
            labels=self._labels,
            image_pull_secret=self._image_pull_secrets,
            image_pull_policy=image_pull_policy,
            node_selector=node_selector,
            env=env,
            host_network=host_network,
            port=port,
        )

        deployment = coordinator.get_coordinator_deployment()
        response = self._app_api.create_namespaced_deployment(
            self._namespace, deployment
        )
        targets.append(response)

        # create coordinator service
        service = coordinator.get_coordinator_service(
            service_type=self._service_type, port=port
        )
        response = self._core_api.create_namespaced_service(self._namespace, service)
        targets.append(response)

        self._resource_object.extend(targets)

    def base64_encode(self, string):
        return base64.b64encode(string.encode("utf-8")).decode("utf-8", errors="ignore")

    def _get_coordinator_args(self):
        args = [
            "python3",
            "-m",
            "gscoordinator",
            "--config",
            self.base64_encode(self._config.dumps_json()),
        ]
        return args

    def _create_services(self):
        self._create_coordinator()

    def _waiting_for_services_ready(self):
        response = self._app_api.read_namespaced_deployment_status(
            namespace=self._namespace, name=self._coordinator_name
        )

        # get deployment pods
        match_labels = response.spec.selector.match_labels
        selector = ",".join([f"{k}={v}" for k, v in match_labels.items()])
        pods = self._core_api.list_namespaced_pod(
            namespace=self._namespace, label_selector=selector
        )
        assert len(pods.items) == 1, "coordinator deployment should have only one pod"
        pod = pods.items[0]
        self._coordinator_pods_watcher = KubernetesPodWatcher(
            api_client=self._api_client,
            namespace=self._namespace,
            pod=pod,
            container="coordinator",
        )
        self._coordinator_pods_watcher.start()

        if wait_for_deployment_complete(
            api_client=self._api_client,
            namespace=self._namespace,
            name=self._coordinator_name,
            pods_watcher=self._coordinator_pods_watcher,
            timeout_seconds=self._config.session.timeout_seconds,
        ):
            self._coordinator_pods_watcher.stop()

    def _try_to_get_coordinator_service_from_configmap(self):
        config_map_name = f"gs-coordinator-{self._instance_id}"
        start_time = time.time()
        while True:
            try:
                response = self._core_api.read_namespaced_config_map(
                    name=config_map_name, namespace=self._namespace
                )
                return f"{response.data['ip']}:{response.data['port']}"
            except K8SApiException:
                pass
            time.sleep(1)
            if time.time() - start_time > self._config.session.timeout_seconds:
                raise TimeoutError("Get coordinator service from configmap timeout")

    def _get_coordinator_endpoint(self):
        if self._service_type is None:
            # try to get endpoint from configmap
            return self._try_to_get_coordinator_service_from_configmap()

        # Always len(endpoints) >= 1
        endpoints = get_service_endpoints(
            api_client=self._api_client,
            namespace=self._namespace,
            name=self._coordinator_service_name,
            service_type=self._service_type,
        )

        return endpoints[0]

    def _dump_coordinator_failed_status(self):
        # Dump failed status even show_log is False
        if self._coordinator_pods_watcher is None:
            return
        while True:
            try:
                message = self._coordinator_pods_watcher.poll(timeout_seconds=3)
                logger.error(message, extra={"simple": True})
            except queue.Empty:
                break
        self._coordinator_pods_watcher.stop()
        self._coordinator_pods_watcher = None

    def start(self):
        """Launch graphscope instance on kubernetes cluster.

        Raises:
            RuntimeError: If instance launch failed or timeout.

        Returns:
            str: Coordinator service endpoint.
        """
        try:
            self._create_namespace()
            self._create_role_and_binding()

            self._create_services()
            time.sleep(1)

            self._waiting_for_services_ready()

            self._coordinator_endpoint = self._get_coordinator_endpoint()
            logger.info(
                "Coordinator pod start successful with address %s, connecting to service ...",
                self._coordinator_endpoint,
            )
        except Exception:
            time.sleep(1)
            self._dump_coordinator_failed_status()
            self.stop()
            raise

    def stop(self, wait=False):
        """Stop graphscope instance on kubernetes cluster.

        Raises:
            TimeoutError:
                Waiting for stop instance timeout when ``wait`` or ``_waiting_for_delete`` is True.
        """
        # delete resources created by graphscope inside namespace
        # make sure delete permission resources in the end
        logger.info("Stopping coordinator")
        for target in reversed(self._resource_object):
            delete_kubernetes_object(
                api_client=self._api_client,
                target=target,
                wait=self._config.kubernetes_launcher.waiting_for_delete,
                timeout_seconds=self._config.session.timeout_seconds,
            )
        self._resource_object = []
        if self._config.kubernetes_launcher.delete_namespace:
            # delete namespace
            api = CoreV1Api(self._api_client)
            try:
                api.delete_namespace(self._namespace)
                self._config.kubernetes_launcher.delete_namespace = False
            except K8SApiException as e:
                if e.status == 404:  # namespace already deleted.
                    pass
                else:
                    raise
        logger.info("Stopped coordinator")


if __name__ == "__main__":
    from kubernetes import config as kube_config

    kube_config.load_kube_config()
    client = kube_client.ApiClient()

    config = Config()
    config.kubernetes_launcher.namespace = "demo"
    config.kubernetes_launcher.service_type = "NodePort"
    config.session.num_workers = 2

    launcher = KubernetesClusterLauncher(
        config=config,
        api_client=client,
    )
    launcher.start()
    print(launcher._get_coordinator_endpoint())
    launcher.stop()
