#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import copy
import json
import logging
import os
import random
import shlex
import subprocess
import sys
import time

from graphscope.proto import message_pb2

from gscoordinator.cluster_builder import EngineCluster
from gscoordinator.cluster_builder import MarsCluster

try:
    from kubernetes import client as kube_client
    from kubernetes import config as kube_config
    from kubernetes import watch as kube_watch
    from kubernetes.client import AppsV1Api
    from kubernetes.client import CoreV1Api
    from kubernetes.client.rest import ApiException as K8SApiException
    from kubernetes.config import ConfigException as K8SConfigException
except ImportError:
    kube_client = None
    kube_config = None
    kube_watch = None
    AppsV1Api = None
    CoreV1Api = None
    K8SApiException = None
    K8SConfigException = None

from graphscope.config import Config
from graphscope.deploy.kubernetes.utils import delete_kubernetes_object
from graphscope.deploy.kubernetes.utils import get_kubernetes_object_info
from graphscope.deploy.kubernetes.utils import resolve_api_client
from graphscope.framework.utils import PipeWatcher
from graphscope.framework.utils import get_tempdir
from graphscope.proto import types_pb2

from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME
from gscoordinator.constants import GRAPHLEARN_CONTAINER_NAME
from gscoordinator.constants import GRAPHLEARN_TORCH_CONTAINER_NAME
from gscoordinator.constants import INTERACTIVE_EXECUTOR_CONTAINER_NAME
from gscoordinator.launcher import AbstractLauncher
from gscoordinator.utils import ANALYTICAL_ENGINE_PATH
from gscoordinator.utils import GRAPHSCOPE_HOME
from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT
from gscoordinator.utils import WORKSPACE
from gscoordinator.utils import ResolveMPICmdPrefix
from gscoordinator.utils import delegate_command_to_pod
from gscoordinator.utils import parse_as_glog_level
from gscoordinator.utils import replace_string_in_dict
from gscoordinator.utils import run_kube_cp_command

logger = logging.getLogger("graphscope")


class FakeKubeResponse:
    def __init__(self, obj):
        self.data = json.dumps(obj)


class KubernetesClusterLauncher(AbstractLauncher):
    def __init__(self, config: Config):
        super().__init__()
        self._serving = False

        self._api_client = resolve_api_client()
        self._core_api = kube_client.CoreV1Api(self._api_client)
        self._apps_api = kube_client.AppsV1Api(self._api_client)
        self._pytorchjobs_api = kube_client.CustomObjectsApi(self._api_client)
        self._resource_object = ResourceManager(self._api_client)

        self._config: Config = config
        self._config.kubernetes_launcher.engine.post_setup()
        launcher_config = config.kubernetes_launcher

        # glog level
        self._glog_level = parse_as_glog_level(config.log_level)

        # Session Config
        self._num_workers = config.session.num_workers
        self._instance_id = config.session.instance_id
        self._timeout_seconds = config.session.timeout_seconds
        self._retry_time_seconds = config.session.retry_time_seconds

        # Vineyard Config
        # self._vineyard_socket = config.vineyard.socket
        self._vineyard_rpc_port = config.vineyard.rpc_port
        self._vineyard_deployment = config.vineyard.deployment_name

        # Launcher Config
        self._namespace = launcher_config.namespace
        self._delete_namespace = launcher_config.delete_namespace

        # Coordinator Config
        self._coordinator_name = config.coordinator.deployment_name
        self._coordinator_service_name = self._coordinator_name

        self._image_registry = launcher_config.image.registry
        self._image_repository = launcher_config.image.repository
        self._image_tag = launcher_config.image.tag
        self._image_pull_policy = launcher_config.image.pull_policy
        self._image_pull_secrets = launcher_config.image.pull_secrets

        self._vineyard_resource = config.vineyard.resource

        self._volumes = launcher_config.volumes

        self._owner_references = self.get_coordinator_owner_references()

        self._engine_pod_prefix = "gs-engine-"

        self._vineyard_image = config.vineyard.image
        self._vineyard_mem = config.vineyard.resource.requests.memory
        self._vineyard_cpu = config.vineyard.resource.requests.cpu

        self._service_type = launcher_config.service_type

        self._waiting_for_delete = launcher_config.waiting_for_delete

        # check the validity of deploy mode
        self._deploy_mode = launcher_config.deployment_mode
        if self._deploy_mode not in ["eager", "lazy"]:
            logger.error(
                "Invalid mode %s, choose from 'eager' or 'lazy'. Proceeding with default mode: 'eager'",
                self._deploy_mode,
            )
            self._deploy_mode = "eager"

        self._vineyard_pod_name_list = []

        # set the kube config file
        self._k8s_config_file = launcher_config.config_file
        if self._k8s_config_file is None:
            self._k8s_config_file = os.environ.get("KUBECONFIG", "~/.kube/config")

        if self._vineyard_deployment is not None:
            self._deploy_vineyard_deployment_if_not_exist()
            # check the if the vineyard deployment is ready again
            if not self._check_if_vineyard_deployment_exist():
                # if not ready, then set the vineyard deployment to None
                logger.error(
                    "Vineyard deployment %s is not ready, please check the deployment status."
                    "Proceeding with none vineyard deployment mode.",
                    self._vineyard_deployment,
                )
                self._vineyard_deployment = None

        # if the vineyard deployment is not set and use the eager mode,
        # which means deploy the engine as a single pod and there is no
        # external vineyard deployment. The vineyard objects are not
        # shared between the engine pods, so report an error here and set
        # the mode to eager.
        if self._deploy_mode == "lazy" and self._vineyard_deployment is None:
            logger.error(
                "Lazy mode is only possible with a vineyard deployment, "
                "please add a vineyard deployment name by k8s_vineyard_deployment='vineyardd-sample'. "
                "Proceeding with default mode: 'eager'"
            )
            self._deploy_mode = "eager"

        self._pod_name_list = []
        self._pod_ip_list = []
        self._pod_host_ip_list = []

        # analytical engine
        self._analytical_pod_name = []
        self._analytical_pod_ip = []
        self._analytical_pod_host_ip = []
        # analytical java engine
        self._analytical_java_pod_name = []
        self._analytical_java_pod_ip = []
        self._analytical_java_pod_host_ip = []
        # interactive engine
        self._interactive_resource_object = {}
        self._interactive_pod_name = {}
        self._interactive_pod_ip = {}
        self._interactive_pod_host_ip = {}
        # graphlearn engine
        self._graphlearn_resource_object = {}
        self._graphlearn_pod_name = {}
        self._graphlearn_pod_ip = {}
        self._graphlearn_pod_host_ip = {}
        # graphlearn_torch engine
        self._graphlearn_torch_resource_object = {}
        self._graphlearn_torch_pod_name = {}
        self._graphlearn_torch_pod_ip = {}
        self._graphlearn_torch_pod_host_ip = {}

        self._analytical_engine_endpoint = None
        self._mars_service_endpoint = None

        self._analytical_engine_process = None
        self._random_analytical_engine_rpc_port = random.randint(56001, 57000)
        # interactive engine
        # executor inter-processing port
        # executor rpc port
        # frontend port
        self._interactive_port = 8233
        # 8000 ~ 9000 is exposed
        self._graphlearn_start_port = 8000
        # 9001 ~ 10001 is exposed
        self._graphlearn_torch_start_port = 9001

        self._graphlearn_services = {}
        self._graphlearn_instance_processes = {}

        self._graphlearn_torch_services = {}
        self._graphlearn_torch_instance_processes = {}

        # workspace
        self._instance_workspace = os.path.join(WORKSPACE, self._instance_id)
        os.makedirs(self._instance_workspace, exist_ok=True)
        self._session_workspace = None

        self._engine_cluster = self._build_engine_cluster()
        self._vineyard_socket = self._engine_cluster.vineyard_ipc_socket

        self._vineyard_service_endpoint = None
        self._vineyard_internal_service_endpoint = None
        self._mars_service_endpoint = None
        if self._config.kubernetes_launcher.mars.enable:
            self._mars_cluster = MarsCluster(
                self._instance_id, self._namespace, self._service_type
            )

    def __del__(self):
        self.stop()

    def type(self):
        return types_pb2.K8S

    # the argument `with_analytical_` means whether to add the analytical engine
    # container to the engine statefulsets, and the other three arguments are similar.
    def _build_engine_cluster(self):
        return EngineCluster(
            config=self._config,
            engine_pod_prefix=self._engine_pod_prefix,
            graphlearn_start_port=self._graphlearn_start_port,
            graphlearn_torch_start_port=self._graphlearn_torch_start_port,
        )

    def get_coordinator_owner_references(self):
        owner_references = []
        if self._coordinator_name:
            try:
                deployment = self._apps_api.read_namespaced_deployment(
                    self._coordinator_name, self._namespace
                )
                owner_references.append(
                    kube_client.V1OwnerReference(
                        api_version="apps/v1",
                        kind="Deployment",
                        name=self._coordinator_name,
                        uid=deployment.metadata.uid,
                    )
                )
            except K8SApiException:
                logger.error("Coordinator %s not found", self._coordinator_name)

        return owner_references

    def waiting_for_delete(self):
        return self._waiting_for_delete

    def get_namespace(self):
        return self._namespace

    def get_vineyard_stream_info(self):
        if self._vineyard_deployment is not None:
            hosts = [
                f"{self._namespace}:{host}" for host in self._vineyard_pod_name_list
            ]
        else:
            hosts = [f"{self._namespace}:{host}" for host in self._pod_name_list]
        return "kubernetes", hosts

    def set_session_workspace(self, session_id):
        self._session_workspace = os.path.join(self._instance_workspace, session_id)
        os.makedirs(self._session_workspace, exist_ok=True)

    def launch_etcd(self):
        pass

    def configure_etcd_endpoint(self):
        pass

    @property
    def hosts(self):
        """list of pod name"""
        return self._pod_name_list

    @property
    def hosts_list(self):
        return self._get_analytical_hosts()

    @property
    def vineyard_endpoint(self) -> str:
        if self._check_if_vineyard_deployment_exist():
            return self._vineyard_service_endpoint
        else:
            return self._vineyard_internal_endpoint

    def distribute_file(self, path):
        pod_name_list, _, _ = self._allocate_analytical_engine()
        for pod in pod_name_list:
            container = ANALYTICAL_CONTAINER_NAME
            try:
                # The library may exist in the analytical pod.
                test_cmd = f"test -f {path}"
                logger.debug(delegate_command_to_pod(test_cmd, pod, container))
                logger.info("Library exists, skip distribute")
            except RuntimeError:
                cmd = f"mkdir -p {os.path.dirname(path)}"
                logger.debug(delegate_command_to_pod(cmd, pod, container))
                logger.debug(run_kube_cp_command(path, path, pod, container, True))

    def close_analytical_instance(self):
        pass

    def launch_vineyard(self):
        """Launch vineyardd in k8s cluster."""
        # vineyardd is auto launched in vineyardd container
        # args = f"vineyardd \
        #  -socket {self._engine_cluster._sock} -etcd_endpoint http://{self._pod_ip_list[0]}:2379"
        pass

    def close_etcd(self):
        # etcd is managed by vineyard
        pass

    def close_vineyard(self):
        # No need to close vineyardd
        # Use delete deployment instead
        pass

    def check_if_engine_exist(self, engine_type, object_id=None):
        """Checks if the engine with the given type exists.

        Args:
            engine_type: The type of engine to check for.
            object_id: The object id of the engine to check for.

        Returns:
            True if the engine exists, False otherwise.
        """

        if object_id:
            engine_pod_name_dict = getattr(self, f"_{engine_type}_pod_name")
            engine_pod_name_list = engine_pod_name_dict.get(object_id, [])
            engine_pod_ip_dict = getattr(self, f"_{engine_type}_pod_ip")
            engine_pod_ip_list = engine_pod_ip_dict.get(object_id, [])
            engine_pod_host_ip_dict = getattr(self, f"_{engine_type}_pod_host_ip")
            engine_pod_host_ip_list = engine_pod_host_ip_dict.get(object_id, [])
        else:
            engine_pod_name_list = getattr(self, f"_{engine_type}_pod_name")
            engine_pod_ip_list = getattr(self, f"_{engine_type}_pod_ip")
            engine_pod_host_ip_list = getattr(self, f"_{engine_type}_pod_host_ip")

        return engine_pod_name_list and engine_pod_ip_list and engine_pod_host_ip_list

    def deploy_engine(self, engine_type, object_id=None):
        """Deploys the engine with the given type.

        Args:
            engine_type: The type of engine to deploy.
            object_id: The object ID to deploy the engine with.

        Returns:
            A tuple of the pod names, IP addresses, and host IP addresses of the
            deployed engine and the response of the engine and service.
        """

        if not self.check_if_engine_exist(engine_type, object_id):
            self._engine_pod_prefix = f"gs-{engine_type}-" + (
                f"{object_id}-" if object_id else ""
            ).replace("_", "-")
            self._config.kubernetes_launcher.engine.enable_gae = (
                engine_type == "analytical"
            )
            self._config.kubernetes_launcher.engine.enable_gae_java = (
                engine_type == "analytical-java"
            )
            self._config.kubernetes_launcher.engine.enable_gie = (
                engine_type == "interactive"
            )
            self._config.kubernetes_launcher.engine.enable_gle = (
                engine_type == "graphlearn"
            )
            self._config.kubernetes_launcher.engine.enable_glt = (
                engine_type == "graphlearn-torch"
            )

            self._engine_cluster = self._build_engine_cluster()
            response = self._create_engine_stateful_set()
            self._waiting_for_services_ready()

            if object_id:
                resource_object = getattr(self, f"_{engine_type}_resource_object")
                pod_name = getattr(self, f"_{engine_type}_pod_name")
                pod_ip = getattr(self, f"_{engine_type}_pod_ip")
                pod_host_ip = getattr(self, f"_{engine_type}_pod_host_ip")
                resource_object[object_id] = response
                pod_name[object_id] = self._pod_name_list
                pod_ip[object_id] = self._pod_ip_list
                pod_host_ip[object_id] = self._pod_host_ip_list
            else:
                # Set the engine pod info
                setattr(self, f"_{engine_type}_pod_name", self._pod_name_list)
                setattr(self, f"_{engine_type}_pod_ip", self._pod_ip_list)
                setattr(self, f"_{engine_type}_pod_host_ip", self._pod_host_ip_list)

        return (
            (
                getattr(self, f"_{engine_type}_pod_name")
                if object_id is None
                else getattr(self, f"_{engine_type}_pod_name")[object_id]
            ),
            (
                getattr(self, f"_{engine_type}_pod_ip")
                if object_id is None
                else getattr(self, f"_{engine_type}_pod_ip")[object_id]
            ),
            (
                getattr(self, f"_{engine_type}_pod_host_ip")
                if object_id is None
                else getattr(self, f"_{engine_type}_pod_host_ip")[object_id]
            ),
        )

    def delete_engine_stateful_set_with_object_id(self, engine_type, object_id):
        """delete the engine stateful set with the given object id.

        Args:
            engine_type(str): the type of engine
            object_id (int): The object id of the engine to delete.
        """
        resource_object = getattr(self, f"_{engine_type}_resource_object")
        obj = resource_object.get(object_id, {})
        if obj:
            delete_kubernetes_object(
                api_client=self._api_client,
                target=obj,
                wait=self._waiting_for_delete,
                timeout_seconds=self._timeout_seconds,
            )

            pod_name = getattr(self, f"_{engine_type}_pod_name")
            pod_ip = getattr(self, f"_{engine_type}_pod_ip")
            pod_host_ip = getattr(self, f"_{engine_type}_pod_host_ip")
            del resource_object[object_id]
            del pod_name[object_id]
            del pod_ip[object_id]
            del pod_host_ip[object_id]

    def deploy_analytical_engine(self):
        return self.deploy_engine("analytical")

    def deploy_analytical_java_engine(self):
        return self.deploy_engine("analytical-java")

    def deploy_interactive_engine(self, object_id):
        pod_name_list, pod_ip_list, pod_host_ip_list = self.deploy_engine(
            "interactive", object_id
        )
        try:
            response = self._core_api.read_namespaced_pod(
                pod_name_list[0], self._namespace
            )
        except K8SApiException:
            logger.exception(
                "Get pod %s error, please check if the pod is ready",
                pod_name_list[0],
            )
        owner_references = [
            kube_client.V1OwnerReference(
                api_version=response.metadata.owner_references[0].api_version,
                kind=response.metadata.owner_references[0].kind,
                name=response.metadata.owner_references[0].name,
                uid=response.metadata.owner_references[0].uid,
            )
        ]
        name = f"gs-interactive-frontend-{object_id}-{self._instance_id}"
        self._create_frontend_deployment(name, owner_references)

        return pod_name_list, pod_ip_list, pod_host_ip_list

    def deploy_graphlearn_engine(self, object_id):
        return self.deploy_engine("graphlearn", object_id)

    def deploy_graphlearn_torch_engine(self, object_id):
        return self.deploy_engine("graphlearn-torch", object_id)

    def delete_interactive_engine(self, object_id):
        self.delete_engine_stateful_set_with_object_id("interactive", object_id)

    def delete_graphlearn_engine(self, object_id):
        self.delete_engine_stateful_set_with_object_id("graphlearn", object_id)

    def delete_graphlearn_torch_engine(self, object_id):
        self.delete_engine_stateful_set_with_object_id("graphlearn-torch", object_id)

    def _allocate_interactive_engine(self, object_id):
        # check the interactive engine flag
        if not self._config.kubernetes_launcher.engine.enable_gie:
            raise NotImplementedError("Interactive engine not enabled")

        # allocate analytical engine based on the mode
        if self._deploy_mode == "eager":
            return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
        return self.deploy_interactive_engine(object_id)

    def _distribute_interactive_process(
        self,
        hosts,
        object_id: int,
        schema_path: str,
        params: dict,
        with_cypher: bool,
        engine_selector: str,
    ):
        """
        Args:
            hosts (str): hosts of the graph.
            object_id (int): object id of the graph.
            schema_path (str): path of the schema file.
            engine_selector(str): the label selector of the engine.
        """
        env = os.environ.copy()
        env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
        container = INTERACTIVE_EXECUTOR_CONTAINER_NAME

        params = "\n".join([f"{k}={v}" for k, v in params.items()])
        params = base64.b64encode(params.encode("utf-8")).decode("utf-8")
        neo4j_disabled = "true" if not with_cypher else "false"
        cmd = [
            INTERACTIVE_ENGINE_SCRIPT,
            "create_gremlin_instance_on_k8s",
            self._session_workspace,
            str(object_id),
            schema_path,
            hosts,
            container,
            str(self._interactive_port),  # executor port
            str(self._interactive_port + 1),  # executor rpc port
            str(self._interactive_port + 2),  # frontend gremlin port
            str(self._interactive_port + 3),  # frontend cypher port
            self._coordinator_name,
            engine_selector,
            neo4j_disabled,
            params,
        ]
        self._interactive_port += 4
        logger.info("Create GIE instance with command: %s", " ".join(cmd))
        process = subprocess.Popen(
            cmd,
            start_new_session=True,
            cwd=os.getcwd(),
            env=env,
            encoding="utf-8",
            errors="replace",
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            bufsize=1,
            universal_newlines=True,
        )
        return process

    def create_interactive_instance(
        self, object_id: int, schema_path: str, params: dict, with_cypher: bool
    ):
        pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
        if not pod_name_list:
            raise RuntimeError("Failed to allocate interactive engine")
        hosts = ",".join(pod_name_list)

        engine_selector = "gs-engine-" + self._instance_id
        if self._deploy_mode == "lazy":
            engine_selector = (
                "gs-interactive-" + str(object_id) + "-" + self._instance_id
            )

        return self._distribute_interactive_process(
            hosts, object_id, schema_path, params, with_cypher, engine_selector
        )

    def close_interactive_instance(self, object_id):
        if self._deploy_mode == "lazy":
            logger.info("Close interactive instance with object id: %d", object_id)
            self.delete_interactive_engine(object_id)
            return None
        pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
        hosts = ",".join(pod_name_list)
        env = os.environ.copy()
        env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
        container = INTERACTIVE_EXECUTOR_CONTAINER_NAME
        cmd = [
            INTERACTIVE_ENGINE_SCRIPT,
            "close_gremlin_instance_on_k8s",
            self._session_workspace,
            str(object_id),
            hosts,
            container,
            self._instance_id,
        ]
        logger.info("Close GIE instance with command: %s", " ".join(cmd))
        process = subprocess.Popen(
            cmd,
            start_new_session=True,
            cwd=os.getcwd(),
            env=env,
            encoding="utf-8",
            errors="replace",
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True,
            bufsize=1,
        )
        return process

    def _create_mars_scheduler(self):
        logger.info("Launching mars scheduler pod for GraphScope ...")
        deployment = self._mars_cluster.get_mars_deployment()
        deployment.metadata.owner_references = self._owner_references
        response = self._apps_api.create_namespaced_deployment(
            self._namespace, deployment
        )
        self._resource_object.append(response)

    # The function is used to inject vineyard as a sidecar container into the workload
    # and return the json string of new workload which is injected with vineyard sidecar
    #
    # Assume we have a workload json as below:
    #
    # {
    #  "apiVersion": "apps/v1",
    #  "kind": "Deployment",
    #  "metadata": {
    #    "name": "nginx-deployment",
    #    "namespace": "vineyard-job"
    #  },
    #  "spec": {
    #    "selector": {
    #      "matchLabels": {
    #        "app": "nginx"
    #      }
    #    },
    #    "template": {
    #      "metadata": {
    #        "labels": {
    #          "app": "nginx"
    #        }
    #      },
    #      "spec": {
    #        "containers": [
    #          {
    #            "name": "nginx",
    #            "image": "nginx:1.14.2",
    #            "ports": [
    #              {
    #                "containerPort": 80
    #              }
    #            ]
    #          }
    #        ]
    #      }
    #    }
    #  }
    # }
    #
    # The function will return a new workload json as below:
    #
    # {
    #  "apiVersion": "apps/v1",
    #  "kind": "Deployment",
    #  "metadata": {
    #    "creationTimestamp": null,
    #    "name": "nginx-deployment",
    #    "namespace": "vineyard-job"
    #  },
    #  "spec": {
    #    "selector": {
    #      "matchLabels": {
    #        "app": "nginx"
    #      }
    #    }
    #  },
    #  "template": {
    #    "metadata": null,
    #    "labels": {
    #      "app": "nginx",
    #      "app.vineyard.io/name": "vineyard-sidecar"
    #    },
    #    "spec": {
    #      "containers": [
    #        {
    #          "command": null,
    #          "image": "nginx:1.14.2",
    #          "name": "nginx",
    #          "ports": [
    #            {
    #              "containerPort": 80
    #            }
    #          ],
    #          "volumeMounts": [
    #            {
    #              "mountPath": "/var/run",
    #              "name": "vineyard-socket"
    #            }
    #          ]
    #        },
    #        {
    #          "command": [
    #            "/bin/bash",
    #            "-c",
    #            "/usr/bin/wait-for-it.sh -t 60 vineyard-sidecar-etcd-service.vineyard-job.svc.cluster.local:2379; \\\n
    #             sleep 1; /usr/local/bin/vineyardd --sync_crds true --socket /var/run/vineyard.sock --size 256Mi \\\n
    #             --stream_threshold 80 --etcd_cmd etcd --etcd_prefix /vineyard \\\n
    #             --etcd_endpoint http://vineyard-sidecar-etcd-service:2379\n"
    #          ],
    #          "env": [
    #            {
    #              "name": "VINEYARDD_UID",
    #              "value": null
    #            },
    #            {
    #              "name": "VINEYARDD_NAME",
    #              "value": "vineyard-sidecar"
    #            },
    #            {
    #              "name": "VINEYARDD_NAMESPACE",
    #              "value": "vineyard-job"
    #            }
    #          ],
    #          "image": "vineyardcloudnative/vineyardd:latest",
    #          "imagePullPolicy": "IfNotPresent",
    #          "name": "vineyard-sidecar",
    #          "ports": [
    #            {
    #              "containerPort": 9600,
    #              "name": "vineyard-rpc",
    #              "protocol": "TCP"
    #            }
    #          ],
    #          "volumeMounts": [
    #            {
    #              "mountPath": "/var/run",
    #              "name": "vineyard-socket"
    #            }
    #          ]
    #        }
    #      ],
    #      "volumes": [
    #        {
    #          "emptyDir": {},
    #          "name": "vineyard-socket"
    #        }
    #      ]
    #    }
    #  }
    # }

    def _inject_vineyard_as_sidecar(self, workload):
        import vineyard

        # create the annotations for the workload's template if not exists
        if workload.spec.template.metadata.annotations is None:
            workload.spec.template.metadata.annotations = {}

        # create the labels for the workload's template if not exists
        if workload.spec.template.metadata.labels is None:
            workload.spec.template.metadata.labels = {}

        workload_json = json.dumps(
            self._api_client.sanitize_for_serialization(workload)
        )

        sts_name = (
            f"{self._engine_cluster.engine_stateful_set_name}-{self._instance_id}"
        )

        owner_reference_json = self._get_owner_reference_as_json()
        # inject vineyard sidecar into the workload
        #
        # the name is used to specify the name of the sidecar container, which is also the
        # labelSelector of the rpc service and the etcd service.
        #
        # the apply_resources is used to apply resources to the kubernetes cluster during
        # the injection.
        #
        # for more details about vineyardctl inject, please refer to the link below:
        # https://github.com/v6d-io/v6d/tree/main/k8s/cmd#vineyardctl-inject

        new_workload_json = vineyard.deploy.vineyardctl.inject(
            kubeconfig=self._k8s_config_file,
            resource=workload_json,
            sidecar_volume_mountpath="/tmp/vineyard_workspace",
            name=sts_name + "-vineyard",
            apply_resources=True,
            owner_references=owner_reference_json,
            sidecar_image=self._vineyard_image,
            sidecar_cpu=self._vineyard_cpu,
            sidecar_memory=self._vineyard_mem,
            sidecar_service_type=self._service_type,
            output="json",
            capture=True,
        )

        normalized_workload_json = json.loads(new_workload_json)
        final_workload_json = json.loads(normalized_workload_json["workload"])

        fake_kube_response = FakeKubeResponse(final_workload_json)

        new_workload = self._api_client.deserialize(fake_kube_response, type(workload))
        return new_workload

    def _create_engine_stateful_set(self):
        logger.info("Creating engine pods...")

        stateful_set = self._engine_cluster.get_engine_stateful_set()
        if self._vineyard_deployment is not None:
            # schedule engine statefulset to the same node with vineyard deployment
            stateful_set = self._add_pod_affinity_for_vineyard_deployment(
                workload=stateful_set
            )
        else:
            stateful_set = self._inject_vineyard_as_sidecar(stateful_set)

        response = self._apps_api.create_namespaced_stateful_set(
            self._namespace, stateful_set
        )
        self._resource_object.append(response)
        return response

    def _create_frontend_deployment(self, name=None, owner_references=None):
        logger.info("Creating frontend pods...")
        deployment = self._engine_cluster.get_interactive_frontend_deployment()
        if name is not None:
            deployment.metadata.name = name
        deployment.metadata.owner_references = owner_references
        response = self._apps_api.create_namespaced_deployment(
            self._namespace, deployment
        )
        self._resource_object.append(response)

    def _create_frontend_service(self):
        logger.info("Creating frontend service...")
        service = self._engine_cluster.get_interactive_frontend_service(8233, 7687)
        service.metadata.owner_references = self._owner_references
        response = self._core_api.create_namespaced_service(self._namespace, service)
        self._resource_object.append(response)

    def _create_graphlearn_service(self, object_id):
        logger.info("Creating graphlearn service...")
        service = self._engine_cluster.get_graphlearn_service(
            object_id, self._graphlearn_start_port
        )
        service.metadata.owner_references = self._owner_references
        response = self._core_api.create_namespaced_service(self._namespace, service)
        self._graphlearn_services[object_id] = response
        self._resource_object.append(response)

    def _create_graphlearn_torch_service(self, object_id):
        logger.info("Creating graphlearn torch service...")
        service = self._engine_cluster.get_graphlearn_torch_service(
            object_id, self._graphlearn_torch_start_port
        )
        service.metadata.owner_references = self._owner_references
        response = self._core_api.create_namespaced_service(self._namespace, service)
        self._graphlearn_torch_services[object_id] = response
        self._resource_object.append(response)

    def get_engine_config(self):
        config = {
            "vineyard_service_name": self._engine_cluster.vineyard_service_name,
            "vineyard_rpc_endpoint": self._vineyard_service_endpoint,
        }
        if self._config.kubernetes_launcher.mars.enable:
            config["mars_endpoint"] = self._mars_service_endpoint
        return config

    def _create_services(self):
        self._create_engine_stateful_set()
        if self._config.kubernetes_launcher.engine.enable_gie:
            self._create_frontend_deployment(owner_references=self._owner_references)
            # self._create_frontend_service()
        if self._config.kubernetes_launcher.mars.enable:
            # scheduler used by Mars
            self._create_mars_scheduler()

    def _waiting_for_services_ready(self):
        logger.info("Waiting for services ready...")
        selector = ""
        namespace = self._namespace
        start_time = time.time()
        event_messages = []
        while True:
            # TODO: Add label selector to filter out deployments.
            statefulsets = self._apps_api.list_namespaced_stateful_set(namespace)
            service_available = False
            for rs in statefulsets.items:
                if rs.metadata.name == self._engine_cluster.engine_stateful_set_name:
                    # logger.info(
                    #     "Engine pod: %s ready / %s total",
                    #     rs.status.ready_replicas,
                    #     self._num_workers,
                    # )
                    if rs.status.ready_replicas == self._num_workers:
                        # service is ready
                        service_available = True
                        break

                    # check container status
                    labels = rs.spec.selector.match_labels
                    selector = ",".join(f"{k}={v}" for k, v in labels.items())

                    pods = self._core_api.list_namespaced_pod(
                        namespace=namespace, label_selector=selector
                    )

                    for pod in pods.items:
                        pod_name = pod.metadata.name
                        field_selector = "involvedObject.name=" + pod_name
                        stream = kube_watch.Watch().stream(
                            self._core_api.list_namespaced_event,
                            namespace,
                            field_selector=field_selector,
                            timeout_seconds=1,
                        )
                        for event in stream:
                            msg = f"[{pod_name}]: {event['object'].message}"
                            if msg not in event_messages:
                                event_messages.append(msg)
                                logger.info(msg)
                                if event["object"].reason == "Failed":
                                    raise RuntimeError("Kubernetes event error: " + msg)

            if service_available:
                break
            if self._timeout_seconds + start_time < time.time():
                raise TimeoutError("GraphScope Engines launching timeout.")
            time.sleep(self._retry_time_seconds)

        self._pod_name_list = []
        self._pod_ip_list = []
        self._pod_host_ip_list = []
        pods = self._core_api.list_namespaced_pod(
            namespace=namespace, label_selector=selector
        )
        for pod in pods.items:
            self._pod_name_list.append(pod.metadata.name)
            self._pod_ip_list.append(pod.status.pod_ip)
            self._pod_host_ip_list.append(pod.status.host_ip)
        assert len(self._pod_ip_list) > 0

        self._vineyard_service_endpoint = (
            self._engine_cluster.get_vineyard_service_endpoint(self._api_client)
        )
        self._vineyard_internal_endpoint = (
            f"{self._pod_ip_list[0]}:{self._engine_cluster._vineyard_service_port}"
        )

        logger.info("GraphScope engines pod is ready.")
        logger.info("Engines pod name list: %s", self._pod_name_list)
        logger.info("Engines pod ip list: %s", self._pod_ip_list)
        logger.info("Engines pod host ip list: %s", self._pod_host_ip_list)
        logger.info("Vineyard service endpoint: %s", self._vineyard_service_endpoint)
        if self._config.kubernetes_launcher.mars.enable:
            self._mars_service_endpoint = self._mars_cluster.get_mars_service_endpoint(
                self._api_client
            )
            logger.info("Mars service endpoint: %s", self._mars_service_endpoint)

    # the function will add the podAffinity to the engine workload so that the workload
    # will be scheduled to the same node with vineyard deployment.
    # e.g. the vineyard deployment is named "vineyard-deployment" and the namespace is "graphscope-system",
    # the podAffinity will be added to the engine workload as below:
    # spec:
    #   affinity:
    #     podAffinity:
    #       requiredDuringSchedulingIgnoredDuringExecution:
    #       - labelSelector:
    #           matchExpressions:
    #           - key: app.kubernetes.io/instance
    #             operator: In
    #             values:
    #             - graphscope-system-vineyard-deployment # [vineyard deployment namespace]-[vineyard deployment name]
    #         topologyKey: kubernetes.io/hostname
    def _add_pod_affinity_for_vineyard_deployment(self, workload):
        import vineyard

        workload_json = json.dumps(
            self._api_client.sanitize_for_serialization(workload)
        )
        new_workload_json = vineyard.deploy.vineyardctl.schedule.workload(
            kubeconfig=self._k8s_config_file,
            resource=workload_json,
            vineyardd_name=self._vineyard_deployment,
            vineyardd_namespace=self._namespace,
            capture=True,
        )

        normalized_workload_json = json.loads(new_workload_json)
        fake_kube_response = FakeKubeResponse(normalized_workload_json)
        new_workload = self._api_client.deserialize(fake_kube_response, type(workload))
        return new_workload

    def _dump_resource_object(self):
        resource = {}
        if self._delete_namespace:
            resource[self._namespace] = "Namespace"
        else:
            # coordinator info
            resource[self._coordinator_name] = "Deployment"
            resource[self._coordinator_service_name] = "Service"
        self._resource_object.dump(extra_resource=resource)

    def _get_analytical_hosts(self):
        pod_name_list = self._pod_name_list
        if self._analytical_pod_name:
            pod_name_list = self._analytical_pod_name
        return pod_name_list

    def _allocate_analytical_engine(self):
        # allocate analytical engine based on the mode
        if self._deploy_mode == "eager":
            return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
        else:
            if self._config.kubernetes_launcher.engine.enable_gae:
                return self.deploy_analytical_engine()
            elif self._config.kubernetes_launcher.engine.enable_gae_java:
                return self.deploy_analytical_java_engine()
            else:
                logger.warning("analytical is not enabled, skip allocating")

    def _distribute_analytical_process(self, pod_name_list, pod_ip_list):
        # generate and distribute hostfile
        hosts = os.path.join(get_tempdir(), "hosts_of_nodes")
        with open(hosts, "w") as f:
            for i, pod_ip in enumerate(pod_ip_list):
                f.write(f"{pod_ip} {pod_name_list[i]}\n")

        container = ANALYTICAL_CONTAINER_NAME
        for pod in pod_name_list:
            logger.debug(
                run_kube_cp_command(hosts, "/tmp/hosts_of_nodes", pod, container, True)
            )

        # launch engine
        rmcp = ResolveMPICmdPrefix(rsh_agent=True)
        cmd, mpi_env = rmcp.resolve(self._num_workers, pod_name_list)

        cmd.append(ANALYTICAL_ENGINE_PATH)
        cmd.extend(["--host", "0.0.0.0"])
        cmd.extend(["--port", str(self._random_analytical_engine_rpc_port)])

        cmd.extend(["-v", str(self._glog_level)])
        mpi_env["GLOG_v"] = str(self._glog_level)

        cmd.extend(["--vineyard_socket", self._engine_cluster.vineyard_ipc_socket])
        logger.info("Analytical engine launching command: %s", " ".join(cmd))

        env = os.environ.copy()
        env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
        env.update(mpi_env)

        self._analytical_engine_process = subprocess.Popen(
            cmd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            encoding="utf-8",
            errors="replace",
            universal_newlines=True,
            bufsize=1,
        )

        stdout_watcher = PipeWatcher(
            self._analytical_engine_process.stdout, sys.stdout, drop=True
        )
        stderr_watcher = PipeWatcher(
            self._analytical_engine_process.stderr, sys.stderr, drop=True
        )
        setattr(self._analytical_engine_process, "stdout_watcher", stdout_watcher)
        setattr(self._analytical_engine_process, "stderr_watcher", stderr_watcher)

    def create_analytical_instance(self):
        pod_name_list, pod_ip_list, _ = self._allocate_analytical_engine()
        if not pod_name_list or not pod_ip_list:
            raise RuntimeError("Failed to allocate analytical engine.")
        self._distribute_analytical_process(pod_name_list, pod_ip_list)
        self._analytical_engine_endpoint = (
            f"{self._pod_ip_list[0]}:{self._random_analytical_engine_rpc_port}"
        )
        logger.info(
            "GAE rpc service is listening on %s ...", self._analytical_engine_endpoint
        )

    def _delete_dangling_coordinator(self):
        # delete service
        try:
            self._core_api.delete_namespaced_service(
                self._coordinator_service_name, self._namespace
            )
        except K8SApiException as ex:
            if ex.status == 404:
                logger.warning(
                    "coordinator service %s not found", self._coordinator_service_name
                )
            else:
                logger.exception(
                    "Deleting dangling coordinator service %s failed",
                    self._coordinator_service_name,
                )
        try:
            self._apps_api.delete_namespaced_deployment(
                self._coordinator_name, self._namespace
            )
        except K8SApiException as ex:
            if ex.status == 404:
                logger.warning(
                    "coordinator deployment %s not found", self._coordinator_name
                )
            else:
                logger.exception(
                    "Deleting dangling coordinator %s failed", self._coordinator_name
                )

        if self._waiting_for_delete:
            start_time = time.time()
            while True:
                try:
                    self._apps_api.read_namespaced_deployment(
                        self._coordinator_name, self._namespace
                    )
                except K8SApiException as ex:
                    if ex.status != 404:
                        logger.exception(
                            "Deleting dangling coordinator %s failed",
                            self._coordinator_name,
                        )
                    break
                else:
                    if time.time() - start_time > self._timeout_seconds:
                        logger.error(
                            "Deleting dangling coordinator %s timeout",
                            self._coordinator_name,
                        )
                    time.sleep(self._retry_time_seconds)

    def _get_owner_reference_as_json(self):
        if self._owner_references:
            owner_reference = [
                {
                    "apiVersion": self._owner_references[0].api_version,
                    "kind": self._owner_references[0].kind,
                    "name": self._owner_references[0].name,
                    "uid": self._owner_references[0].uid,
                }
            ]
            owner_reference_json = json.dumps(owner_reference)
        else:
            owner_reference_json = json.dumps([])
        return owner_reference_json

    def _check_if_vineyard_deployment_exist(self):
        if self._vineyard_deployment is None or self._vineyard_deployment == "":
            return False
        try:
            self._apps_api.read_namespaced_deployment(
                self._vineyard_deployment, self._namespace
            )
        except K8SApiException:
            logger.info(
                "Vineyard deployment %s/%s not exist",
                self._namespace,
                self._vineyard_deployment,
            )
            return False
        return True

    def _deploy_vineyard_deployment_if_not_exist(self):
        if not self._check_if_vineyard_deployment_exist():
            self._deploy_vineyard_deployment()
        else:
            logger.info(
                "The external vineyard deployment %s is ready."
                "Please make sure the type of the vineyard rpc service is the same as %s.",
                self._vineyard_deployment,
                self._service_type,
            )

    def _deploy_vineyard_deployment(self):
        import vineyard

        owner_reference_json = self._get_owner_reference_as_json()
        vineyard.deploy.vineyardctl.deploy.vineyard_deployment(
            kubeconfig=self._k8s_config_file,
            name=self._vineyard_deployment,
            namespace=self._namespace,
            replicas=self._num_workers,
            etcd_replicas=1,
            vineyardd_image=self._vineyard_image,
            vineyardd_memory=self._vineyard_mem,
            vineyardd_cpu=self._vineyard_cpu,
            vineyardd_service_type=self._service_type,
            owner_references=owner_reference_json,
        )
        vineyard_pods = self._core_api.list_namespaced_pod(
            self._namespace,
            label_selector=f"app.kubernetes.io/instance={self._namespace}-{self._vineyard_deployment}",
        )
        self._vineyard_pod_name_list.extend(
            [pod.metadata.name for pod in vineyard_pods.items]
        )

    def start(self):
        if self._serving:
            return True
        try:
            if self._deploy_mode == "eager":
                self._create_services()
                self._waiting_for_services_ready()
                self._dump_resource_object()
                self._serving = True
        except Exception:  # pylint: disable=broad-except
            time.sleep(1)
            logger.exception("Error when launching GraphScope on kubernetes cluster")
            self.stop()
            return False
        return True

    def stop(self, is_dangling=False):
        if self._serving:
            logger.info("Cleaning up kubernetes resources")
            for target in self._resource_object:
                delete_kubernetes_object(
                    api_client=self._api_client,
                    target=target,
                    wait=self._waiting_for_delete,
                    timeout_seconds=self._timeout_seconds,
                )
            self._resource_object.clear()

            if is_dangling:
                logger.info("Dangling coordinator detected, cleaning up...")
                # delete everything inside namespace of graphscope instance
                if self._delete_namespace:
                    # delete namespace created by graphscope
                    self._core_api.delete_namespace(self._namespace)
                    if self._waiting_for_delete:
                        start_time = time.time()
                        while True:
                            try:
                                self._core_api.read_namespace(self._namespace)
                            except K8SApiException as ex:
                                if ex.status != 404:
                                    logger.exception(
                                        "Deleting dangling namespace %s failed",
                                        self._namespace,
                                    )
                                break
                            else:
                                if time.time() - start_time > self._timeout_seconds:
                                    logger.error(
                                        "Deleting namespace %s timeout", self._namespace
                                    )
                                time.sleep(self._retry_time_seconds)

                else:
                    # delete coordinator deployment and service
                    self._delete_dangling_coordinator()
            self._serving = False
            logger.info("Kubernetes launcher stopped")

    def _allocate_graphlearn_engine(self, object_id):
        # check the graphlearn engine flag
        if not self._config.kubernetes_launcher.engine.enable_gle:
            raise NotImplementedError("GraphLearn engine not enabled")

        # allocate graphlearn engine based on the mode
        if self._deploy_mode == "eager":
            return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
        return self.deploy_graphlearn_engine(object_id)

    def _allocate_graphlearn_torch_engine(self, object_id):
        # check the graphlearn torch engine flag
        if not self._config.kubernetes_launcher.engine.enable_glt:
            raise NotImplementedError("GraphLearn torch engine not enabled")

        # allocate graphlearn engine based on the mode
        if self._deploy_mode == "eager":
            return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
        return self.deploy_graphlearn_torch_engine(object_id)

    def _distribute_graphlearn_process(
        self, pod_name_list, pod_host_ip_list, object_id, handle, config
    ):
        # allocate service for ports
        # prepare arguments
        handle = json.loads(
            base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
                "utf-8", errors="ignore"
            )
        )
        hosts = ",".join(
            [
                f"{pod_name}:{port}"
                for pod_name, port in zip(
                    pod_name_list,
                    self._engine_cluster.get_graphlearn_ports(
                        self._graphlearn_start_port
                    ),
                )
            ]
        )
        handle["server"] = hosts
        handle = base64.b64encode(
            json.dumps(handle).encode("utf-8", errors="ignore")
        ).decode("utf-8", errors="ignore")

        # launch the server
        self._graphlearn_instance_processes[object_id] = []
        for pod_index, pod in enumerate(self._pod_name_list):
            container = GRAPHLEARN_CONTAINER_NAME
            sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}"
            cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
            # logger.debug("launching learning server: %s", " ".join(cmd))
            proc = subprocess.Popen(
                shlex.split(cmd),
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                encoding="utf-8",
                errors="replace",
                universal_newlines=True,
                bufsize=1,
            )
            stdout_watcher = PipeWatcher(
                proc.stdout,
                sys.stdout,
                drop=True,
                suppressed=(not logger.isEnabledFor(logging.DEBUG)),
            )
            setattr(proc, "stdout_watcher", stdout_watcher)
            self._graphlearn_instance_processes[object_id].append(proc)

        # Create Service
        self._create_graphlearn_service(object_id)
        # update the port usage record
        self._graphlearn_start_port += len(pod_name_list)
        # parse the service hosts and ports
        return self._engine_cluster.get_graphlearn_service_endpoint(
            self._api_client, object_id, pod_host_ip_list
        )

    def _distribute_graphlearn_torch_process(
        self, pod_name_list, pod_ip_list, object_id, handle, config
    ):
        # allocate service for ports
        # prepare arguments
        handle = json.loads(
            base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
                "utf-8", errors="ignore"
            )
        )

        ports = self._engine_cluster.get_graphlearn_torch_ports(
            self._graphlearn_torch_start_port
        )
        handle["master_addr"] = pod_ip_list[0]
        handle["server_client_master_port"] = ports[0]
        server_list = [f"{pod_ip_list[0]}:{ports[i]}" for i in range(4)]

        server_handle = base64.b64encode(
            json.dumps(handle).encode("utf-8", errors="ignore")
        ).decode("utf-8", errors="ignore")

        # launch the server
        self._graphlearn_torch_instance_processes[object_id] = []
        for pod_index, pod in enumerate(self._pod_name_list):
            container = GRAPHLEARN_TORCH_CONTAINER_NAME
            sub_cmd = f"env PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python \
                python3 -m gscoordinator.launch_graphlearn_torch \
                {server_handle} {config} {pod_index}"
            cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
            # logger.debug("launching learning server: %s", " ".join(cmd))
            proc = subprocess.Popen(
                shlex.split(cmd),
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                encoding="utf-8",
                errors="replace",
                universal_newlines=True,
                bufsize=1,
            )
            stdout_watcher = PipeWatcher(
                proc.stdout,
                sys.stdout,
                suppressed=(not logger.isEnabledFor(logging.DEBUG)),
            )

            time.sleep(5)
            logger.debug("process status: %s", proc.poll())

            setattr(proc, "stdout_watcher", stdout_watcher)
            self._graphlearn_torch_instance_processes[object_id].append(proc)

        # Create Service
        self._create_graphlearn_torch_service(object_id)
        # update the port usage record
        self._graphlearn_torch_start_port += len(pod_name_list)

        # prepare config map for client scripts
        config_map = kube_client.V1ConfigMap(
            api_version="v1",
            kind="ConfigMap",
            metadata=kube_client.V1ObjectMeta(
                name="graphlearn-torch-client-config",
                namespace=self._namespace,
            ),
            data=handle["client_content"],
        )
        self._core_api.create_namespaced_config_map(self._namespace, config_map)

        # prepare the manifest
        pytorch_job_manifest = replace_string_in_dict(
            handle["manifest"], "${MASTER_ADDR}", handle["master_addr"]
        )
        # parse the pytorchjob yaml
        group = pytorch_job_manifest["apiVersion"].split("/")[0]
        version = pytorch_job_manifest["apiVersion"].split("/")[1]
        name = pytorch_job_manifest["metadata"]["name"]
        namespace = pytorch_job_manifest["metadata"]["namespace"]
        plural = "pytorchjobs"  # This is PyTorchJob CRD's plural name

        try:
            # create PyTorchJob
            api_response = self._pytorchjobs_api.create_namespaced_custom_object(
                group=group,
                version=version,
                namespace=namespace,
                plural=plural,
                body=pytorch_job_manifest,
            )
            logger.info(api_response)
        except K8SApiException as e:
            logger.info(
                f"Exception when calling CustomObjectsApi->create_namespaced_custom_object: {e}"
            )
            raise

        # set Watcher to monitor the state of the PyTorchJob
        w = kube_watch.Watch()

        # loop checking the state of PyTorchJob
        for event in w.stream(
            self._pytorchjobs_api.list_namespaced_custom_object,
            group,
            version,
            namespace,
            plural,
        ):
            pytorch_job = event["object"]
            if pytorch_job.get("metadata", {}).get("name") == name:
                status = pytorch_job.get("status", {})
                if status:  # check status existence
                    conditions = status.get("conditions", [])
                    for condition in conditions:
                        if (
                            condition.get("type") == "Succeeded"
                            and condition.get("status") == "True"
                        ):
                            logger.info(f"PyTorchJob {name} has succeeded!")
                            w.stop()
                            break
                        elif (
                            condition.get("type") == "Failed"
                            and condition.get("status") == "True"
                        ):
                            logger.info(f"PyTorchJob {name} has failed!")
                            w.stop()
                            break

        self.close_graphlearn_torch_client(group, name, version, plural, namespace)

        return server_list

    def create_learning_instance(self, object_id, handle, config, learning_backend):
        if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
            pod_name_list, _, pod_host_ip_list = self._allocate_graphlearn_engine(
                object_id
            )
            if not pod_name_list or not pod_host_ip_list:
                raise RuntimeError("Failed to allocate learning engine")
            return self._distribute_graphlearn_process(
                pod_name_list, pod_host_ip_list, object_id, handle, config
            )
        elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
            (
                pod_name_list,
                pod_ip_list,
                pod_host_ip_list,
            ) = self._allocate_graphlearn_torch_engine(object_id)
            if not pod_name_list or not pod_host_ip_list:
                raise RuntimeError("Failed to allocate learning engine")
            return self._distribute_graphlearn_torch_process(
                pod_name_list, pod_ip_list, object_id, handle, config
            )
        else:
            raise ValueError("invalid learning backend")

    def close_learning_instance(self, object_id, learning_backend):
        if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
            self.close_graphlearn_instance(object_id)
        elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
            self.close_graphlearn_torch_instance(object_id)
        else:
            raise ValueError("invalid learning backend")

    def close_graphlearn_instance(self, object_id):
        if self._deploy_mode == "lazy":
            self.delete_graphlearn_engine(object_id)
            return
        if object_id not in self._graphlearn_instance_processes:
            return
        # delete the services
        target = self._graphlearn_services[object_id]
        try:
            delete_kubernetes_object(
                api_client=self._api_client,
                target=target,
                wait=self._waiting_for_delete,
                timeout_seconds=self._timeout_seconds,
            )
        except Exception:  # pylint: disable=broad-except
            logger.exception("Failed to delete graphlearn service for %s", object_id)

        # terminate the process
        for proc in self._graphlearn_instance_processes[object_id]:
            try:
                proc.terminate()
                proc.wait(1)
            except Exception:  # pylint: disable=broad-except
                logger.exception("Failed to terminate graphlearn server")
        self._graphlearn_instance_processes[object_id].clear()

    def close_graphlearn_torch_instance(self, object_id):
        if self._deploy_mode == "lazy":
            self.delete_graphlearn_torch_engine(object_id)
            return
        if object_id not in self._graphlearn_torch_instance_processes:
            return
        # delete the services
        target = self._graphlearn_torch_services[object_id]
        try:
            delete_kubernetes_object(
                api_client=self._api_client,
                target=target,
                wait=self._waiting_for_delete,
                timeout_seconds=self._timeout_seconds,
            )
        except Exception:  # pylint: disable=broad-except
            logger.exception(
                "Failed to delete graphlearn torch service for %s", object_id
            )

        # terminate the process
        for proc in self._graphlearn_torch_instance_processes[object_id]:
            try:
                proc.terminate()
                proc.wait(1)
            except Exception:  # pylint: disable=broad-except
                logger.exception("Failed to terminate graphlearn torch server")
        self._graphlearn_torch_instance_processes[object_id].clear()

    def close_graphlearn_torch_client(self, group, name, version, plural, namespace):
        # clear PyTorchJob
        logger.info(f"Deleting PyTorchJob {name}...")
        try:
            response = self._pytorchjobs_api.delete_namespaced_custom_object(
                group=group,
                name=name,
                version=version,
                plural=plural,
                namespace=namespace,
                body=kube_client.V1DeleteOptions(
                    propagation_policy="Foreground",
                ),
            )
            logger.info(f"PyTorchJob {name} deleted. Response: {response}")
        except K8SApiException as e:
            logger.info(
                f"Exception when calling CustomObjectsApi->delete_namespaced_custom_object: {e}"
            )

        try:
            response = self._core_api.delete_namespaced_config_map(
                name="graphlearn-torch-client-config",
                namespace=self._namespace,
            )
            logger.info(
                f"ConfigMap graphlearn-torch-client-config deleted. Response: {response}"
            )
        except K8SApiException as e:
            logger.info(
                f"Exception when calling CoreV1Api->delete_namespaced_config_map: {e}"
            )


class ResourceManager(object):
    """A class to manager kubernetes object.

    Object managed by this class will dump meta info to disk file
    for pod preStop lifecycle management.

    meta info format:

        {
            "my-deployment": "Deployment",
            "my-service": "Service"
        }
    """

    _resource_object_path = os.path.join(get_tempdir(), "resource_object")  # fixed

    def __init__(self, api_client):
        self._api_client = api_client
        self._resource_object = []
        self._meta_info = {}

    def append(self, target):
        self._resource_object.append(target)
        self._meta_info.update(
            get_kubernetes_object_info(api_client=self._api_client, target=target)
        )
        self.dump()

    def extend(self, targets):
        self._resource_object.extend(targets)
        for target in targets:
            self._meta_info.update(
                get_kubernetes_object_info(api_client=self._api_client, target=target)
            )
        self.dump()

    def clear(self):
        self._resource_object.clear()
        self._meta_info.clear()

    def __str__(self):
        return str(self._meta_info)

    def __getitem__(self, index):
        return self._resource_object[index]

    def dump(self, extra_resource=None):
        """Dump meta info to disk file.
        Args:
            extra_resource (dict): extra resource to dump.
                A typical scenario is dumping meta info of namespace
                for coordinator dangling processing.
        """
        if extra_resource is not None:
            rlt = copy.deepcopy(self._meta_info)
            rlt.update(extra_resource)
        else:
            rlt = self._meta_info
        with open(self._resource_object_path, "w") as f:
            json.dump(rlt, f)
