#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import logging
import sys

from kubernetes import client as kube_client

from graphscope.deploy.kubernetes.utils import parse_readable_memory
from graphscope.framework.utils import get_tempdir

logger = logging.getLogger("graphscope")


class ResourceBuilder:
    @staticmethod
    def get_configmap(name, kvs):
        metadata = kube_client.V1ObjectMeta(name=name)
        configmap = kube_client.V1ConfigMap(metadata=metadata, data=kvs)
        return configmap

    @staticmethod
    def get_role(name, namespace, api_groups, resources, verbs, labels):
        metadata = kube_client.V1ObjectMeta(name=name, namespace=namespace)
        metadata.labels = labels
        rule = kube_client.V1PolicyRule(
            api_groups=api_groups.split(","),
            resources=resources.split(","),
            verbs=verbs.split(","),
        )
        role = kube_client.V1Role(metadata=metadata, rules=[rule])
        return role

    @staticmethod
    def get_cluster_role(name, api_groups, resources, verbs, labels):
        metadata = kube_client.V1ObjectMeta(name=name, labels=labels)
        rule = kube_client.V1PolicyRule(
            api_groups=api_groups.split(","),
            resources=resources.split(","),
            verbs=verbs.split(","),
        )
        role = kube_client.V1ClusterRole(metadata=metadata, rules=[rule])
        return role

    @staticmethod
    def get_role_binding(name, namespace, role_name, service_account_name, labels):
        metadata = kube_client.V1ObjectMeta(name=name, namespace=namespace)
        metadata.labels = labels
        role_ref = kube_client.V1RoleRef(
            kind="Role", name=role_name, api_group="rbac.authorization.k8s.io"
        )
        subject = kube_client.RbacV1Subject(
            kind="ServiceAccount", name=service_account_name, namespace=namespace
        )
        role_binding = kube_client.V1RoleBinding(
            metadata=metadata, role_ref=role_ref, subjects=[subject]
        )
        return role_binding

    @staticmethod
    def get_cluster_role_binding(
        name, namespace, role_name, service_account_name, labels
    ):
        metadata = kube_client.V1ObjectMeta(name=name, labels=labels)
        role_ref = kube_client.V1RoleRef(
            kind="ClusterRole", name=role_name, api_group="rbac.authorization.k8s.io"
        )
        subject = kube_client.RbacV1Subject(
            kind="ServiceAccount", name=service_account_name, namespace=namespace
        )
        role_binding = kube_client.V1ClusterRoleBinding(
            metadata=metadata, role_ref=role_ref, subjects=[subject]
        )
        return role_binding

    @staticmethod
    def get_tcp_probe(port, timeout=15, period=10, failure_threshold=8):
        return kube_client.V1Probe(
            tcp_socket=kube_client.V1TCPSocketAction(port=port),
            timeout_seconds=timeout,
            period_seconds=period,
            failure_threshold=failure_threshold,
        )

    @staticmethod
    def get_exec_action(command):
        return kube_client.V1ExecAction(command=command)

    @staticmethod
    def get_lifecycle_handler(_exec=None, http_get=None, tcp_socket=None):
        handler = kube_client.V1LifecycleHandler(
            _exec=_exec, http_get=http_get, tcp_socket=tcp_socket
        )
        return handler

    @staticmethod
    def get_lifecycle(post_start=None, pre_stop=None):
        return kube_client.V1Lifecycle(post_start=post_start, pre_stop=pre_stop)

    @staticmethod
    def get_image_pull_secrets(image_pull_secrets):
        """
        for name in self._image_pull_secrets:
            engine_builder.add_image_pull_secret(name)
        """
        local_object_refs = []
        for name in image_pull_secrets:
            local_object_refs.append(kube_client.V1LocalObjectReference(name=name))
        return local_object_refs

    @staticmethod
    def get_node_selector(node_selector):
        import base64
        import json

        decoded_node_selector = base64.b64decode(node_selector).decode(
            "utf-8", errors="ignore"
        )
        return json.loads(decoded_node_selector)

    @staticmethod
    def get_user_defined_volumes(udf_volumes):
        """
        .. code:: python

            {
                name: {
                    "type": "",
                    "field": {},  # the keys are subject to volume type
                    "mounts": [ {"mountPath": "", "subPath": ""}, ... ]
                }
            }
        """
        if not udf_volumes:
            return [], [], []
        volumes, source_volume_mounts = [], []
        for name, value in udf_volumes.items():
            volume = kube_client.V1Volume(name=name)
            field = value.get("field", {})
            if value["type"] == "hostPath":
                volume.host_path = kube_client.V1HostPathVolumeSource(
                    path=field["path"]
                )
                if "type" in field:
                    volume.host_path.type = field["type"]
            elif value["type"] == "emptyDir":
                volume.empty_dir = kube_client.V1EmptyDirVolumeSource()
                if "medium" in field:
                    volume.empty_dir.medium = field["medium"]
                if "sizeLimit" in field:
                    volume.empty_dir.size_limit = field["sizeLimit"]
            elif value["type"] == "persistentVolumeClaim":
                pvc = kube_client.V1PersistentVolumeClaimVolumeSource(
                    claim_name=field["claimName"]
                )
                volume.persistent_volume_claim = pvc
                if "readOnly" in field:
                    volume.persistent_volume_claim.read_only = field["readOnly"]
            elif value["type"] == "configMap":
                volume.config_map = kube_client.V1ConfigMapVolumeSource(
                    name=field["name"]
                )
            elif value["type"] == "secret":
                volume.secret = kube_client.V1SecretVolumeSource(
                    secret_name=field["name"]
                )
            else:
                raise ValueError(f"Unsupported volume type: {value['type']}")
            volume_mounts = []
            mounts_list = value["mounts"]
            if not isinstance(mounts_list, list):
                mounts_list = [value["mounts"]]
            for udf_mount in mounts_list:
                volume_mount = kube_client.V1VolumeMount(
                    name=name, mount_path=udf_mount["mountPath"]
                )
                if "subPath" in udf_mount:
                    volume_mount.sub_path = udf_mount["subPath"]
                if "readOnly" in udf_mount:
                    volume_mount.read_only = udf_mount["readOnly"]
                volume_mounts.append(volume_mount)
            volumes.append(volume)
            source_volume_mounts.extend(volume_mounts)
        # Assume destination mounts are the same as source mounts
        destination_volume_mounts = source_volume_mounts
        return volumes, source_volume_mounts, destination_volume_mounts

    @staticmethod
    def get_resources(requests, limits):
        resource_requirements = kube_client.V1ResourceRequirements()
        if requests is not None:
            resource_requirements.requests = requests
        if limits is not None:
            resource_requirements.limits = limits
        return resource_requirements

    @staticmethod
    def get_pod_spec(
        containers: [kube_client.V1Container],
        image_pull_secrets=None,
        node_selector=None,
        volumes=None,
    ):
        pod_spec = kube_client.V1PodSpec(containers=containers)
        if image_pull_secrets is not None and image_pull_secrets:
            pod_spec.image_pull_secrets = ResourceBuilder.get_image_pull_secrets(
                image_pull_secrets
            )
        if node_selector is not None and node_selector:
            pod_spec.node_selector = ResourceBuilder.get_node_selector(node_selector)
        if volumes is not None and volumes:
            pod_spec.volumes = volumes
        return pod_spec

    @staticmethod
    def get_pod_template_spec(
        spec: kube_client.V1PodSpec,
        labels: dict,
        annotations=None,
        default_container=None,
    ):
        pod_template_spec = kube_client.V1PodTemplateSpec()
        pod_template_spec.spec = spec
        if annotations is None:
            annotations = dict()
        if default_container is not None:
            annotations["kubectl.kubernetes.io/default-container"] = default_container
        pod_template_spec.metadata = kube_client.V1ObjectMeta(
            labels=labels, annotations=annotations
        )
        return pod_template_spec

    @staticmethod
    def get_deployment_spec(template, replicas, labels):
        selector = kube_client.V1LabelSelector(match_labels=labels)
        spec = kube_client.V1DeploymentSpec(selector=selector, template=template)
        spec.replicas = replicas
        return spec

    @staticmethod
    def get_deployment(namespace, name, spec, labels):
        deployment = kube_client.V1Deployment()
        deployment.api_version = "apps/v1"
        deployment.kind = "Deployment"
        deployment.metadata = kube_client.V1ObjectMeta(
            name=name, labels=labels, namespace=namespace
        )
        deployment.spec = spec
        return deployment

    @staticmethod
    def get_stateful_set_spec(template, replicas, labels, service_name):
        selector = kube_client.V1LabelSelector(match_labels=labels)
        spec = kube_client.V1StatefulSetSpec(
            selector=selector, template=template, service_name=service_name
        )
        spec.replicas = replicas
        return spec

    @staticmethod
    def get_stateful_set(namespace, name, spec, labels):
        statefulset = kube_client.V1StatefulSet()
        statefulset.api_version = "apps/v1"
        statefulset.kind = "StatefulSet"
        statefulset.metadata = kube_client.V1ObjectMeta(
            name=name, labels=labels, namespace=namespace
        )
        statefulset.spec = spec
        return statefulset

    @staticmethod
    def get_value_from_field_ref(name, field_path):
        env = kube_client.V1EnvVar(name=name)
        value_from = kube_client.V1EnvVarSource()
        value_from.field_ref = kube_client.V1ObjectFieldSelector(field_path=field_path)
        env.value_from = value_from
        return env

    @staticmethod
    def get_namespace(name):
        namespace = kube_client.V1Namespace()
        namespace.metadata = kube_client.V1ObjectMeta(name=name)
        namespace.metadata.labels = {"kubernetes.io/metadata.name": name}
        return namespace

    @staticmethod
    def get_service_spec(type, ports, labels, external_traffic_policy):
        service_spec = kube_client.V1ServiceSpec()
        service_spec.type = type
        service_spec.selector = labels
        service_spec.ports = ports
        if external_traffic_policy is not None:
            service_spec.external_traffic_policy = external_traffic_policy
        return service_spec

    @staticmethod
    def get_service(namespace, name, service_spec, labels, annotations=None):
        service = kube_client.V1Service()
        service.api_version = "v1"
        service.kind = "Service"
        service.spec = service_spec
        metadata = kube_client.V1ObjectMeta(
            namespace=namespace, name=name, labels=labels, annotations=annotations
        )
        service.metadata = metadata
        return service


class CoordinatorDeployment:
    def __init__(
        self,
        namespace,
        name,
        image,
        args,
        labels,
        image_pull_secret,
        image_pull_policy,
        node_selector,
        env,
        host_network,
        port=None,
    ):
        self._replicas = 1
        self._namespace = namespace
        self._name = name
        self._image = image
        self._args = args
        self._labels = labels
        self._image_pull_policy = image_pull_policy
        self._image_pull_secret = image_pull_secret
        self._env: dict = env
        self._port = port
        self._host_network = host_network
        self._node_selector = node_selector
        self._requests = {"cpu": 0.5, "memory": "512Mi"}
        self._limits = {"cpu": 0.5, "memory": "512Mi"}

    def get_lifecycle(self):
        pre_stop = ["python3", "-m", "gscoordinator.hook.prestop"]
        _exec = ResourceBuilder.get_exec_action(pre_stop)
        lifecycle_handler = ResourceBuilder.get_lifecycle_handler(_exec)
        lifecycle = ResourceBuilder.get_lifecycle(pre_stop=lifecycle_handler)
        return lifecycle

    def get_coordinator_container(self):
        resources = ResourceBuilder.get_resources(self._requests, self._limits)
        lifecycle = self.get_lifecycle()
        env = [
            kube_client.V1EnvVar(name=key, value=value)
            for key, value in self._env.items()
        ]
        container = kube_client.V1Container(
            name="coordinator",
            image=self._image,
            image_pull_policy=self._image_pull_policy,
            args=self._args,
            resources=resources,
            lifecycle=lifecycle,
            env=env,
        )

        if self._port is not None:
            container_ports = [kube_client.V1ContainerPort(container_port=self._port)]
            container_ports.append(kube_client.V1ContainerPort(container_port=8000))
            container.ports = container_ports
            container.readiness_probe = ResourceBuilder.get_tcp_probe(
                port=self._port, timeout=15, period=1, failure_threshold=20
            )
        return container

    def get_coordinator_pod_spec(self):
        container = self.get_coordinator_container()
        pod_spec = ResourceBuilder.get_pod_spec(
            containers=[container],
            image_pull_secrets=self._image_pull_secret,
            node_selector=self._node_selector,
        )
        pod_spec.host_network = self._host_network
        return pod_spec

    def get_coordinator_pod_template_spec(self):
        spec = self.get_coordinator_pod_spec()
        return ResourceBuilder.get_pod_template_spec(
            spec, self._labels, default_container="coordinator"
        )

    def get_coordinator_deployment_spec(self, replicas):
        template = self.get_coordinator_pod_template_spec()
        spec = ResourceBuilder.get_deployment_spec(template, replicas, self._labels)
        return spec

    def get_coordinator_deployment(self):
        spec = self.get_coordinator_deployment_spec(self._replicas)
        return ResourceBuilder.get_deployment(
            self._namespace, self._name, spec, self._labels
        )

    def get_coordinator_service(self, service_type, port):
        ports = [kube_client.V1ServicePort(name="coordinator", port=port)]
        ports.append(kube_client.V1ServicePort(name="debug", port=8000))
        service_spec = ResourceBuilder.get_service_spec(
            service_type, ports, self._labels, None
        )
        service = ResourceBuilder.get_service(
            self._namespace, self._name, service_spec, self._labels
        )
        return service
