coordinator/gscoordinator/kubernetes_launcher.py (1,197 lines of code) (raw):
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import copy
import json
import logging
import os
import random
import shlex
import subprocess
import sys
import time
from graphscope.proto import message_pb2
from gscoordinator.cluster_builder import EngineCluster
from gscoordinator.cluster_builder import MarsCluster
try:
from kubernetes import client as kube_client
from kubernetes import config as kube_config
from kubernetes import watch as kube_watch
from kubernetes.client import AppsV1Api
from kubernetes.client import CoreV1Api
from kubernetes.client.rest import ApiException as K8SApiException
from kubernetes.config import ConfigException as K8SConfigException
except ImportError:
kube_client = None
kube_config = None
kube_watch = None
AppsV1Api = None
CoreV1Api = None
K8SApiException = None
K8SConfigException = None
from graphscope.config import Config
from graphscope.deploy.kubernetes.utils import delete_kubernetes_object
from graphscope.deploy.kubernetes.utils import get_kubernetes_object_info
from graphscope.deploy.kubernetes.utils import resolve_api_client
from graphscope.framework.utils import PipeWatcher
from graphscope.framework.utils import get_tempdir
from graphscope.proto import types_pb2
from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME
from gscoordinator.constants import GRAPHLEARN_CONTAINER_NAME
from gscoordinator.constants import GRAPHLEARN_TORCH_CONTAINER_NAME
from gscoordinator.constants import INTERACTIVE_EXECUTOR_CONTAINER_NAME
from gscoordinator.launcher import AbstractLauncher
from gscoordinator.utils import ANALYTICAL_ENGINE_PATH
from gscoordinator.utils import GRAPHSCOPE_HOME
from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT
from gscoordinator.utils import WORKSPACE
from gscoordinator.utils import ResolveMPICmdPrefix
from gscoordinator.utils import delegate_command_to_pod
from gscoordinator.utils import parse_as_glog_level
from gscoordinator.utils import replace_string_in_dict
from gscoordinator.utils import run_kube_cp_command
logger = logging.getLogger("graphscope")
class FakeKubeResponse:
def __init__(self, obj):
self.data = json.dumps(obj)
class KubernetesClusterLauncher(AbstractLauncher):
def __init__(self, config: Config):
super().__init__()
self._serving = False
self._api_client = resolve_api_client()
self._core_api = kube_client.CoreV1Api(self._api_client)
self._apps_api = kube_client.AppsV1Api(self._api_client)
self._pytorchjobs_api = kube_client.CustomObjectsApi(self._api_client)
self._resource_object = ResourceManager(self._api_client)
self._config: Config = config
self._config.kubernetes_launcher.engine.post_setup()
launcher_config = config.kubernetes_launcher
# glog level
self._glog_level = parse_as_glog_level(config.log_level)
# Session Config
self._num_workers = config.session.num_workers
self._instance_id = config.session.instance_id
self._timeout_seconds = config.session.timeout_seconds
self._retry_time_seconds = config.session.retry_time_seconds
# Vineyard Config
# self._vineyard_socket = config.vineyard.socket
self._vineyard_rpc_port = config.vineyard.rpc_port
self._vineyard_deployment = config.vineyard.deployment_name
# Launcher Config
self._namespace = launcher_config.namespace
self._delete_namespace = launcher_config.delete_namespace
# Coordinator Config
self._coordinator_name = config.coordinator.deployment_name
self._coordinator_service_name = self._coordinator_name
self._image_registry = launcher_config.image.registry
self._image_repository = launcher_config.image.repository
self._image_tag = launcher_config.image.tag
self._image_pull_policy = launcher_config.image.pull_policy
self._image_pull_secrets = launcher_config.image.pull_secrets
self._vineyard_resource = config.vineyard.resource
self._volumes = launcher_config.volumes
self._owner_references = self.get_coordinator_owner_references()
self._engine_pod_prefix = "gs-engine-"
self._vineyard_image = config.vineyard.image
self._vineyard_mem = config.vineyard.resource.requests.memory
self._vineyard_cpu = config.vineyard.resource.requests.cpu
self._service_type = launcher_config.service_type
self._waiting_for_delete = launcher_config.waiting_for_delete
# check the validity of deploy mode
self._deploy_mode = launcher_config.deployment_mode
if self._deploy_mode not in ["eager", "lazy"]:
logger.error(
"Invalid mode %s, choose from 'eager' or 'lazy'. Proceeding with default mode: 'eager'",
self._deploy_mode,
)
self._deploy_mode = "eager"
self._vineyard_pod_name_list = []
# set the kube config file
self._k8s_config_file = launcher_config.config_file
if self._k8s_config_file is None:
self._k8s_config_file = os.environ.get("KUBECONFIG", "~/.kube/config")
if self._vineyard_deployment is not None:
self._deploy_vineyard_deployment_if_not_exist()
# check the if the vineyard deployment is ready again
if not self._check_if_vineyard_deployment_exist():
# if not ready, then set the vineyard deployment to None
logger.error(
"Vineyard deployment %s is not ready, please check the deployment status."
"Proceeding with none vineyard deployment mode.",
self._vineyard_deployment,
)
self._vineyard_deployment = None
# if the vineyard deployment is not set and use the eager mode,
# which means deploy the engine as a single pod and there is no
# external vineyard deployment. The vineyard objects are not
# shared between the engine pods, so report an error here and set
# the mode to eager.
if self._deploy_mode == "lazy" and self._vineyard_deployment is None:
logger.error(
"Lazy mode is only possible with a vineyard deployment, "
"please add a vineyard deployment name by k8s_vineyard_deployment='vineyardd-sample'. "
"Proceeding with default mode: 'eager'"
)
self._deploy_mode = "eager"
self._pod_name_list = []
self._pod_ip_list = []
self._pod_host_ip_list = []
# analytical engine
self._analytical_pod_name = []
self._analytical_pod_ip = []
self._analytical_pod_host_ip = []
# analytical java engine
self._analytical_java_pod_name = []
self._analytical_java_pod_ip = []
self._analytical_java_pod_host_ip = []
# interactive engine
self._interactive_resource_object = {}
self._interactive_pod_name = {}
self._interactive_pod_ip = {}
self._interactive_pod_host_ip = {}
# graphlearn engine
self._graphlearn_resource_object = {}
self._graphlearn_pod_name = {}
self._graphlearn_pod_ip = {}
self._graphlearn_pod_host_ip = {}
# graphlearn_torch engine
self._graphlearn_torch_resource_object = {}
self._graphlearn_torch_pod_name = {}
self._graphlearn_torch_pod_ip = {}
self._graphlearn_torch_pod_host_ip = {}
self._analytical_engine_endpoint = None
self._mars_service_endpoint = None
self._analytical_engine_process = None
self._random_analytical_engine_rpc_port = random.randint(56001, 57000)
# interactive engine
# executor inter-processing port
# executor rpc port
# frontend port
self._interactive_port = 8233
# 8000 ~ 9000 is exposed
self._graphlearn_start_port = 8000
# 9001 ~ 10001 is exposed
self._graphlearn_torch_start_port = 9001
self._graphlearn_services = {}
self._graphlearn_instance_processes = {}
self._graphlearn_torch_services = {}
self._graphlearn_torch_instance_processes = {}
# workspace
self._instance_workspace = os.path.join(WORKSPACE, self._instance_id)
os.makedirs(self._instance_workspace, exist_ok=True)
self._session_workspace = None
self._engine_cluster = self._build_engine_cluster()
self._vineyard_socket = self._engine_cluster.vineyard_ipc_socket
self._vineyard_service_endpoint = None
self._vineyard_internal_service_endpoint = None
self._mars_service_endpoint = None
if self._config.kubernetes_launcher.mars.enable:
self._mars_cluster = MarsCluster(
self._instance_id, self._namespace, self._service_type
)
def __del__(self):
self.stop()
def type(self):
return types_pb2.K8S
# the argument `with_analytical_` means whether to add the analytical engine
# container to the engine statefulsets, and the other three arguments are similar.
def _build_engine_cluster(self):
return EngineCluster(
config=self._config,
engine_pod_prefix=self._engine_pod_prefix,
graphlearn_start_port=self._graphlearn_start_port,
graphlearn_torch_start_port=self._graphlearn_torch_start_port,
)
def get_coordinator_owner_references(self):
owner_references = []
if self._coordinator_name:
try:
deployment = self._apps_api.read_namespaced_deployment(
self._coordinator_name, self._namespace
)
owner_references.append(
kube_client.V1OwnerReference(
api_version="apps/v1",
kind="Deployment",
name=self._coordinator_name,
uid=deployment.metadata.uid,
)
)
except K8SApiException:
logger.error("Coordinator %s not found", self._coordinator_name)
return owner_references
def waiting_for_delete(self):
return self._waiting_for_delete
def get_namespace(self):
return self._namespace
def get_vineyard_stream_info(self):
if self._vineyard_deployment is not None:
hosts = [
f"{self._namespace}:{host}" for host in self._vineyard_pod_name_list
]
else:
hosts = [f"{self._namespace}:{host}" for host in self._pod_name_list]
return "kubernetes", hosts
def set_session_workspace(self, session_id):
self._session_workspace = os.path.join(self._instance_workspace, session_id)
os.makedirs(self._session_workspace, exist_ok=True)
def launch_etcd(self):
pass
def configure_etcd_endpoint(self):
pass
@property
def hosts(self):
"""list of pod name"""
return self._pod_name_list
@property
def hosts_list(self):
return self._get_analytical_hosts()
@property
def vineyard_endpoint(self) -> str:
if self._check_if_vineyard_deployment_exist():
return self._vineyard_service_endpoint
else:
return self._vineyard_internal_endpoint
def distribute_file(self, path):
pod_name_list, _, _ = self._allocate_analytical_engine()
for pod in pod_name_list:
container = ANALYTICAL_CONTAINER_NAME
try:
# The library may exist in the analytical pod.
test_cmd = f"test -f {path}"
logger.debug(delegate_command_to_pod(test_cmd, pod, container))
logger.info("Library exists, skip distribute")
except RuntimeError:
cmd = f"mkdir -p {os.path.dirname(path)}"
logger.debug(delegate_command_to_pod(cmd, pod, container))
logger.debug(run_kube_cp_command(path, path, pod, container, True))
def close_analytical_instance(self):
pass
def launch_vineyard(self):
"""Launch vineyardd in k8s cluster."""
# vineyardd is auto launched in vineyardd container
# args = f"vineyardd \
# -socket {self._engine_cluster._sock} -etcd_endpoint http://{self._pod_ip_list[0]}:2379"
pass
def close_etcd(self):
# etcd is managed by vineyard
pass
def close_vineyard(self):
# No need to close vineyardd
# Use delete deployment instead
pass
def check_if_engine_exist(self, engine_type, object_id=None):
"""Checks if the engine with the given type exists.
Args:
engine_type: The type of engine to check for.
object_id: The object id of the engine to check for.
Returns:
True if the engine exists, False otherwise.
"""
if object_id:
engine_pod_name_dict = getattr(self, f"_{engine_type}_pod_name")
engine_pod_name_list = engine_pod_name_dict.get(object_id, [])
engine_pod_ip_dict = getattr(self, f"_{engine_type}_pod_ip")
engine_pod_ip_list = engine_pod_ip_dict.get(object_id, [])
engine_pod_host_ip_dict = getattr(self, f"_{engine_type}_pod_host_ip")
engine_pod_host_ip_list = engine_pod_host_ip_dict.get(object_id, [])
else:
engine_pod_name_list = getattr(self, f"_{engine_type}_pod_name")
engine_pod_ip_list = getattr(self, f"_{engine_type}_pod_ip")
engine_pod_host_ip_list = getattr(self, f"_{engine_type}_pod_host_ip")
return engine_pod_name_list and engine_pod_ip_list and engine_pod_host_ip_list
def deploy_engine(self, engine_type, object_id=None):
"""Deploys the engine with the given type.
Args:
engine_type: The type of engine to deploy.
object_id: The object ID to deploy the engine with.
Returns:
A tuple of the pod names, IP addresses, and host IP addresses of the
deployed engine and the response of the engine and service.
"""
if not self.check_if_engine_exist(engine_type, object_id):
self._engine_pod_prefix = f"gs-{engine_type}-" + (
f"{object_id}-" if object_id else ""
).replace("_", "-")
self._config.kubernetes_launcher.engine.enable_gae = (
engine_type == "analytical"
)
self._config.kubernetes_launcher.engine.enable_gae_java = (
engine_type == "analytical-java"
)
self._config.kubernetes_launcher.engine.enable_gie = (
engine_type == "interactive"
)
self._config.kubernetes_launcher.engine.enable_gle = (
engine_type == "graphlearn"
)
self._config.kubernetes_launcher.engine.enable_glt = (
engine_type == "graphlearn-torch"
)
self._engine_cluster = self._build_engine_cluster()
response = self._create_engine_stateful_set()
self._waiting_for_services_ready()
if object_id:
resource_object = getattr(self, f"_{engine_type}_resource_object")
pod_name = getattr(self, f"_{engine_type}_pod_name")
pod_ip = getattr(self, f"_{engine_type}_pod_ip")
pod_host_ip = getattr(self, f"_{engine_type}_pod_host_ip")
resource_object[object_id] = response
pod_name[object_id] = self._pod_name_list
pod_ip[object_id] = self._pod_ip_list
pod_host_ip[object_id] = self._pod_host_ip_list
else:
# Set the engine pod info
setattr(self, f"_{engine_type}_pod_name", self._pod_name_list)
setattr(self, f"_{engine_type}_pod_ip", self._pod_ip_list)
setattr(self, f"_{engine_type}_pod_host_ip", self._pod_host_ip_list)
return (
(
getattr(self, f"_{engine_type}_pod_name")
if object_id is None
else getattr(self, f"_{engine_type}_pod_name")[object_id]
),
(
getattr(self, f"_{engine_type}_pod_ip")
if object_id is None
else getattr(self, f"_{engine_type}_pod_ip")[object_id]
),
(
getattr(self, f"_{engine_type}_pod_host_ip")
if object_id is None
else getattr(self, f"_{engine_type}_pod_host_ip")[object_id]
),
)
def delete_engine_stateful_set_with_object_id(self, engine_type, object_id):
"""delete the engine stateful set with the given object id.
Args:
engine_type(str): the type of engine
object_id (int): The object id of the engine to delete.
"""
resource_object = getattr(self, f"_{engine_type}_resource_object")
obj = resource_object.get(object_id, {})
if obj:
delete_kubernetes_object(
api_client=self._api_client,
target=obj,
wait=self._waiting_for_delete,
timeout_seconds=self._timeout_seconds,
)
pod_name = getattr(self, f"_{engine_type}_pod_name")
pod_ip = getattr(self, f"_{engine_type}_pod_ip")
pod_host_ip = getattr(self, f"_{engine_type}_pod_host_ip")
del resource_object[object_id]
del pod_name[object_id]
del pod_ip[object_id]
del pod_host_ip[object_id]
def deploy_analytical_engine(self):
return self.deploy_engine("analytical")
def deploy_analytical_java_engine(self):
return self.deploy_engine("analytical-java")
def deploy_interactive_engine(self, object_id):
pod_name_list, pod_ip_list, pod_host_ip_list = self.deploy_engine(
"interactive", object_id
)
try:
response = self._core_api.read_namespaced_pod(
pod_name_list[0], self._namespace
)
except K8SApiException:
logger.exception(
"Get pod %s error, please check if the pod is ready",
pod_name_list[0],
)
owner_references = [
kube_client.V1OwnerReference(
api_version=response.metadata.owner_references[0].api_version,
kind=response.metadata.owner_references[0].kind,
name=response.metadata.owner_references[0].name,
uid=response.metadata.owner_references[0].uid,
)
]
name = f"gs-interactive-frontend-{object_id}-{self._instance_id}"
self._create_frontend_deployment(name, owner_references)
return pod_name_list, pod_ip_list, pod_host_ip_list
def deploy_graphlearn_engine(self, object_id):
return self.deploy_engine("graphlearn", object_id)
def deploy_graphlearn_torch_engine(self, object_id):
return self.deploy_engine("graphlearn-torch", object_id)
def delete_interactive_engine(self, object_id):
self.delete_engine_stateful_set_with_object_id("interactive", object_id)
def delete_graphlearn_engine(self, object_id):
self.delete_engine_stateful_set_with_object_id("graphlearn", object_id)
def delete_graphlearn_torch_engine(self, object_id):
self.delete_engine_stateful_set_with_object_id("graphlearn-torch", object_id)
def _allocate_interactive_engine(self, object_id):
# check the interactive engine flag
if not self._config.kubernetes_launcher.engine.enable_gie:
raise NotImplementedError("Interactive engine not enabled")
# allocate analytical engine based on the mode
if self._deploy_mode == "eager":
return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
return self.deploy_interactive_engine(object_id)
def _distribute_interactive_process(
self,
hosts,
object_id: int,
schema_path: str,
params: dict,
with_cypher: bool,
engine_selector: str,
):
"""
Args:
hosts (str): hosts of the graph.
object_id (int): object id of the graph.
schema_path (str): path of the schema file.
engine_selector(str): the label selector of the engine.
"""
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
container = INTERACTIVE_EXECUTOR_CONTAINER_NAME
params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")
neo4j_disabled = "true" if not with_cypher else "false"
cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_k8s",
self._session_workspace,
str(object_id),
schema_path,
hosts,
container,
str(self._interactive_port), # executor port
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2), # frontend gremlin port
str(self._interactive_port + 3), # frontend cypher port
self._coordinator_name,
engine_selector,
neo4j_disabled,
params,
]
self._interactive_port += 4
logger.info("Create GIE instance with command: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
start_new_session=True,
cwd=os.getcwd(),
env=env,
encoding="utf-8",
errors="replace",
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
)
return process
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict, with_cypher: bool
):
pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
if not pod_name_list:
raise RuntimeError("Failed to allocate interactive engine")
hosts = ",".join(pod_name_list)
engine_selector = "gs-engine-" + self._instance_id
if self._deploy_mode == "lazy":
engine_selector = (
"gs-interactive-" + str(object_id) + "-" + self._instance_id
)
return self._distribute_interactive_process(
hosts, object_id, schema_path, params, with_cypher, engine_selector
)
def close_interactive_instance(self, object_id):
if self._deploy_mode == "lazy":
logger.info("Close interactive instance with object id: %d", object_id)
self.delete_interactive_engine(object_id)
return None
pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
hosts = ",".join(pod_name_list)
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
container = INTERACTIVE_EXECUTOR_CONTAINER_NAME
cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"close_gremlin_instance_on_k8s",
self._session_workspace,
str(object_id),
hosts,
container,
self._instance_id,
]
logger.info("Close GIE instance with command: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
start_new_session=True,
cwd=os.getcwd(),
env=env,
encoding="utf-8",
errors="replace",
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
return process
def _create_mars_scheduler(self):
logger.info("Launching mars scheduler pod for GraphScope ...")
deployment = self._mars_cluster.get_mars_deployment()
deployment.metadata.owner_references = self._owner_references
response = self._apps_api.create_namespaced_deployment(
self._namespace, deployment
)
self._resource_object.append(response)
# The function is used to inject vineyard as a sidecar container into the workload
# and return the json string of new workload which is injected with vineyard sidecar
#
# Assume we have a workload json as below:
#
# {
# "apiVersion": "apps/v1",
# "kind": "Deployment",
# "metadata": {
# "name": "nginx-deployment",
# "namespace": "vineyard-job"
# },
# "spec": {
# "selector": {
# "matchLabels": {
# "app": "nginx"
# }
# },
# "template": {
# "metadata": {
# "labels": {
# "app": "nginx"
# }
# },
# "spec": {
# "containers": [
# {
# "name": "nginx",
# "image": "nginx:1.14.2",
# "ports": [
# {
# "containerPort": 80
# }
# ]
# }
# ]
# }
# }
# }
# }
#
# The function will return a new workload json as below:
#
# {
# "apiVersion": "apps/v1",
# "kind": "Deployment",
# "metadata": {
# "creationTimestamp": null,
# "name": "nginx-deployment",
# "namespace": "vineyard-job"
# },
# "spec": {
# "selector": {
# "matchLabels": {
# "app": "nginx"
# }
# }
# },
# "template": {
# "metadata": null,
# "labels": {
# "app": "nginx",
# "app.vineyard.io/name": "vineyard-sidecar"
# },
# "spec": {
# "containers": [
# {
# "command": null,
# "image": "nginx:1.14.2",
# "name": "nginx",
# "ports": [
# {
# "containerPort": 80
# }
# ],
# "volumeMounts": [
# {
# "mountPath": "/var/run",
# "name": "vineyard-socket"
# }
# ]
# },
# {
# "command": [
# "/bin/bash",
# "-c",
# "/usr/bin/wait-for-it.sh -t 60 vineyard-sidecar-etcd-service.vineyard-job.svc.cluster.local:2379; \\\n
# sleep 1; /usr/local/bin/vineyardd --sync_crds true --socket /var/run/vineyard.sock --size 256Mi \\\n
# --stream_threshold 80 --etcd_cmd etcd --etcd_prefix /vineyard \\\n
# --etcd_endpoint http://vineyard-sidecar-etcd-service:2379\n"
# ],
# "env": [
# {
# "name": "VINEYARDD_UID",
# "value": null
# },
# {
# "name": "VINEYARDD_NAME",
# "value": "vineyard-sidecar"
# },
# {
# "name": "VINEYARDD_NAMESPACE",
# "value": "vineyard-job"
# }
# ],
# "image": "vineyardcloudnative/vineyardd:latest",
# "imagePullPolicy": "IfNotPresent",
# "name": "vineyard-sidecar",
# "ports": [
# {
# "containerPort": 9600,
# "name": "vineyard-rpc",
# "protocol": "TCP"
# }
# ],
# "volumeMounts": [
# {
# "mountPath": "/var/run",
# "name": "vineyard-socket"
# }
# ]
# }
# ],
# "volumes": [
# {
# "emptyDir": {},
# "name": "vineyard-socket"
# }
# ]
# }
# }
# }
def _inject_vineyard_as_sidecar(self, workload):
import vineyard
# create the annotations for the workload's template if not exists
if workload.spec.template.metadata.annotations is None:
workload.spec.template.metadata.annotations = {}
# create the labels for the workload's template if not exists
if workload.spec.template.metadata.labels is None:
workload.spec.template.metadata.labels = {}
workload_json = json.dumps(
self._api_client.sanitize_for_serialization(workload)
)
sts_name = (
f"{self._engine_cluster.engine_stateful_set_name}-{self._instance_id}"
)
owner_reference_json = self._get_owner_reference_as_json()
# inject vineyard sidecar into the workload
#
# the name is used to specify the name of the sidecar container, which is also the
# labelSelector of the rpc service and the etcd service.
#
# the apply_resources is used to apply resources to the kubernetes cluster during
# the injection.
#
# for more details about vineyardctl inject, please refer to the link below:
# https://github.com/v6d-io/v6d/tree/main/k8s/cmd#vineyardctl-inject
new_workload_json = vineyard.deploy.vineyardctl.inject(
kubeconfig=self._k8s_config_file,
resource=workload_json,
sidecar_volume_mountpath="/tmp/vineyard_workspace",
name=sts_name + "-vineyard",
apply_resources=True,
owner_references=owner_reference_json,
sidecar_image=self._vineyard_image,
sidecar_cpu=self._vineyard_cpu,
sidecar_memory=self._vineyard_mem,
sidecar_service_type=self._service_type,
output="json",
capture=True,
)
normalized_workload_json = json.loads(new_workload_json)
final_workload_json = json.loads(normalized_workload_json["workload"])
fake_kube_response = FakeKubeResponse(final_workload_json)
new_workload = self._api_client.deserialize(fake_kube_response, type(workload))
return new_workload
def _create_engine_stateful_set(self):
logger.info("Creating engine pods...")
stateful_set = self._engine_cluster.get_engine_stateful_set()
if self._vineyard_deployment is not None:
# schedule engine statefulset to the same node with vineyard deployment
stateful_set = self._add_pod_affinity_for_vineyard_deployment(
workload=stateful_set
)
else:
stateful_set = self._inject_vineyard_as_sidecar(stateful_set)
response = self._apps_api.create_namespaced_stateful_set(
self._namespace, stateful_set
)
self._resource_object.append(response)
return response
def _create_frontend_deployment(self, name=None, owner_references=None):
logger.info("Creating frontend pods...")
deployment = self._engine_cluster.get_interactive_frontend_deployment()
if name is not None:
deployment.metadata.name = name
deployment.metadata.owner_references = owner_references
response = self._apps_api.create_namespaced_deployment(
self._namespace, deployment
)
self._resource_object.append(response)
def _create_frontend_service(self):
logger.info("Creating frontend service...")
service = self._engine_cluster.get_interactive_frontend_service(8233, 7687)
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)
def _create_graphlearn_service(self, object_id):
logger.info("Creating graphlearn service...")
service = self._engine_cluster.get_graphlearn_service(
object_id, self._graphlearn_start_port
)
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._graphlearn_services[object_id] = response
self._resource_object.append(response)
def _create_graphlearn_torch_service(self, object_id):
logger.info("Creating graphlearn torch service...")
service = self._engine_cluster.get_graphlearn_torch_service(
object_id, self._graphlearn_torch_start_port
)
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._graphlearn_torch_services[object_id] = response
self._resource_object.append(response)
def get_engine_config(self):
config = {
"vineyard_service_name": self._engine_cluster.vineyard_service_name,
"vineyard_rpc_endpoint": self._vineyard_service_endpoint,
}
if self._config.kubernetes_launcher.mars.enable:
config["mars_endpoint"] = self._mars_service_endpoint
return config
def _create_services(self):
self._create_engine_stateful_set()
if self._config.kubernetes_launcher.engine.enable_gie:
self._create_frontend_deployment(owner_references=self._owner_references)
# self._create_frontend_service()
if self._config.kubernetes_launcher.mars.enable:
# scheduler used by Mars
self._create_mars_scheduler()
def _waiting_for_services_ready(self):
logger.info("Waiting for services ready...")
selector = ""
namespace = self._namespace
start_time = time.time()
event_messages = []
while True:
# TODO: Add label selector to filter out deployments.
statefulsets = self._apps_api.list_namespaced_stateful_set(namespace)
service_available = False
for rs in statefulsets.items:
if rs.metadata.name == self._engine_cluster.engine_stateful_set_name:
# logger.info(
# "Engine pod: %s ready / %s total",
# rs.status.ready_replicas,
# self._num_workers,
# )
if rs.status.ready_replicas == self._num_workers:
# service is ready
service_available = True
break
# check container status
labels = rs.spec.selector.match_labels
selector = ",".join(f"{k}={v}" for k, v in labels.items())
pods = self._core_api.list_namespaced_pod(
namespace=namespace, label_selector=selector
)
for pod in pods.items:
pod_name = pod.metadata.name
field_selector = "involvedObject.name=" + pod_name
stream = kube_watch.Watch().stream(
self._core_api.list_namespaced_event,
namespace,
field_selector=field_selector,
timeout_seconds=1,
)
for event in stream:
msg = f"[{pod_name}]: {event['object'].message}"
if msg not in event_messages:
event_messages.append(msg)
logger.info(msg)
if event["object"].reason == "Failed":
raise RuntimeError("Kubernetes event error: " + msg)
if service_available:
break
if self._timeout_seconds + start_time < time.time():
raise TimeoutError("GraphScope Engines launching timeout.")
time.sleep(self._retry_time_seconds)
self._pod_name_list = []
self._pod_ip_list = []
self._pod_host_ip_list = []
pods = self._core_api.list_namespaced_pod(
namespace=namespace, label_selector=selector
)
for pod in pods.items:
self._pod_name_list.append(pod.metadata.name)
self._pod_ip_list.append(pod.status.pod_ip)
self._pod_host_ip_list.append(pod.status.host_ip)
assert len(self._pod_ip_list) > 0
self._vineyard_service_endpoint = (
self._engine_cluster.get_vineyard_service_endpoint(self._api_client)
)
self._vineyard_internal_endpoint = (
f"{self._pod_ip_list[0]}:{self._engine_cluster._vineyard_service_port}"
)
logger.info("GraphScope engines pod is ready.")
logger.info("Engines pod name list: %s", self._pod_name_list)
logger.info("Engines pod ip list: %s", self._pod_ip_list)
logger.info("Engines pod host ip list: %s", self._pod_host_ip_list)
logger.info("Vineyard service endpoint: %s", self._vineyard_service_endpoint)
if self._config.kubernetes_launcher.mars.enable:
self._mars_service_endpoint = self._mars_cluster.get_mars_service_endpoint(
self._api_client
)
logger.info("Mars service endpoint: %s", self._mars_service_endpoint)
# the function will add the podAffinity to the engine workload so that the workload
# will be scheduled to the same node with vineyard deployment.
# e.g. the vineyard deployment is named "vineyard-deployment" and the namespace is "graphscope-system",
# the podAffinity will be added to the engine workload as below:
# spec:
# affinity:
# podAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
# matchExpressions:
# - key: app.kubernetes.io/instance
# operator: In
# values:
# - graphscope-system-vineyard-deployment # [vineyard deployment namespace]-[vineyard deployment name]
# topologyKey: kubernetes.io/hostname
def _add_pod_affinity_for_vineyard_deployment(self, workload):
import vineyard
workload_json = json.dumps(
self._api_client.sanitize_for_serialization(workload)
)
new_workload_json = vineyard.deploy.vineyardctl.schedule.workload(
kubeconfig=self._k8s_config_file,
resource=workload_json,
vineyardd_name=self._vineyard_deployment,
vineyardd_namespace=self._namespace,
capture=True,
)
normalized_workload_json = json.loads(new_workload_json)
fake_kube_response = FakeKubeResponse(normalized_workload_json)
new_workload = self._api_client.deserialize(fake_kube_response, type(workload))
return new_workload
def _dump_resource_object(self):
resource = {}
if self._delete_namespace:
resource[self._namespace] = "Namespace"
else:
# coordinator info
resource[self._coordinator_name] = "Deployment"
resource[self._coordinator_service_name] = "Service"
self._resource_object.dump(extra_resource=resource)
def _get_analytical_hosts(self):
pod_name_list = self._pod_name_list
if self._analytical_pod_name:
pod_name_list = self._analytical_pod_name
return pod_name_list
def _allocate_analytical_engine(self):
# allocate analytical engine based on the mode
if self._deploy_mode == "eager":
return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
else:
if self._config.kubernetes_launcher.engine.enable_gae:
return self.deploy_analytical_engine()
elif self._config.kubernetes_launcher.engine.enable_gae_java:
return self.deploy_analytical_java_engine()
else:
logger.warning("analytical is not enabled, skip allocating")
def _distribute_analytical_process(self, pod_name_list, pod_ip_list):
# generate and distribute hostfile
hosts = os.path.join(get_tempdir(), "hosts_of_nodes")
with open(hosts, "w") as f:
for i, pod_ip in enumerate(pod_ip_list):
f.write(f"{pod_ip} {pod_name_list[i]}\n")
container = ANALYTICAL_CONTAINER_NAME
for pod in pod_name_list:
logger.debug(
run_kube_cp_command(hosts, "/tmp/hosts_of_nodes", pod, container, True)
)
# launch engine
rmcp = ResolveMPICmdPrefix(rsh_agent=True)
cmd, mpi_env = rmcp.resolve(self._num_workers, pod_name_list)
cmd.append(ANALYTICAL_ENGINE_PATH)
cmd.extend(["--host", "0.0.0.0"])
cmd.extend(["--port", str(self._random_analytical_engine_rpc_port)])
cmd.extend(["-v", str(self._glog_level)])
mpi_env["GLOG_v"] = str(self._glog_level)
cmd.extend(["--vineyard_socket", self._engine_cluster.vineyard_ipc_socket])
logger.info("Analytical engine launching command: %s", " ".join(cmd))
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
env.update(mpi_env)
self._analytical_engine_process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
self._analytical_engine_process.stdout, sys.stdout, drop=True
)
stderr_watcher = PipeWatcher(
self._analytical_engine_process.stderr, sys.stderr, drop=True
)
setattr(self._analytical_engine_process, "stdout_watcher", stdout_watcher)
setattr(self._analytical_engine_process, "stderr_watcher", stderr_watcher)
def create_analytical_instance(self):
pod_name_list, pod_ip_list, _ = self._allocate_analytical_engine()
if not pod_name_list or not pod_ip_list:
raise RuntimeError("Failed to allocate analytical engine.")
self._distribute_analytical_process(pod_name_list, pod_ip_list)
self._analytical_engine_endpoint = (
f"{self._pod_ip_list[0]}:{self._random_analytical_engine_rpc_port}"
)
logger.info(
"GAE rpc service is listening on %s ...", self._analytical_engine_endpoint
)
def _delete_dangling_coordinator(self):
# delete service
try:
self._core_api.delete_namespaced_service(
self._coordinator_service_name, self._namespace
)
except K8SApiException as ex:
if ex.status == 404:
logger.warning(
"coordinator service %s not found", self._coordinator_service_name
)
else:
logger.exception(
"Deleting dangling coordinator service %s failed",
self._coordinator_service_name,
)
try:
self._apps_api.delete_namespaced_deployment(
self._coordinator_name, self._namespace
)
except K8SApiException as ex:
if ex.status == 404:
logger.warning(
"coordinator deployment %s not found", self._coordinator_name
)
else:
logger.exception(
"Deleting dangling coordinator %s failed", self._coordinator_name
)
if self._waiting_for_delete:
start_time = time.time()
while True:
try:
self._apps_api.read_namespaced_deployment(
self._coordinator_name, self._namespace
)
except K8SApiException as ex:
if ex.status != 404:
logger.exception(
"Deleting dangling coordinator %s failed",
self._coordinator_name,
)
break
else:
if time.time() - start_time > self._timeout_seconds:
logger.error(
"Deleting dangling coordinator %s timeout",
self._coordinator_name,
)
time.sleep(self._retry_time_seconds)
def _get_owner_reference_as_json(self):
if self._owner_references:
owner_reference = [
{
"apiVersion": self._owner_references[0].api_version,
"kind": self._owner_references[0].kind,
"name": self._owner_references[0].name,
"uid": self._owner_references[0].uid,
}
]
owner_reference_json = json.dumps(owner_reference)
else:
owner_reference_json = json.dumps([])
return owner_reference_json
def _check_if_vineyard_deployment_exist(self):
if self._vineyard_deployment is None or self._vineyard_deployment == "":
return False
try:
self._apps_api.read_namespaced_deployment(
self._vineyard_deployment, self._namespace
)
except K8SApiException:
logger.info(
"Vineyard deployment %s/%s not exist",
self._namespace,
self._vineyard_deployment,
)
return False
return True
def _deploy_vineyard_deployment_if_not_exist(self):
if not self._check_if_vineyard_deployment_exist():
self._deploy_vineyard_deployment()
else:
logger.info(
"The external vineyard deployment %s is ready."
"Please make sure the type of the vineyard rpc service is the same as %s.",
self._vineyard_deployment,
self._service_type,
)
def _deploy_vineyard_deployment(self):
import vineyard
owner_reference_json = self._get_owner_reference_as_json()
vineyard.deploy.vineyardctl.deploy.vineyard_deployment(
kubeconfig=self._k8s_config_file,
name=self._vineyard_deployment,
namespace=self._namespace,
replicas=self._num_workers,
etcd_replicas=1,
vineyardd_image=self._vineyard_image,
vineyardd_memory=self._vineyard_mem,
vineyardd_cpu=self._vineyard_cpu,
vineyardd_service_type=self._service_type,
owner_references=owner_reference_json,
)
vineyard_pods = self._core_api.list_namespaced_pod(
self._namespace,
label_selector=f"app.kubernetes.io/instance={self._namespace}-{self._vineyard_deployment}",
)
self._vineyard_pod_name_list.extend(
[pod.metadata.name for pod in vineyard_pods.items]
)
def start(self):
if self._serving:
return True
try:
if self._deploy_mode == "eager":
self._create_services()
self._waiting_for_services_ready()
self._dump_resource_object()
self._serving = True
except Exception: # pylint: disable=broad-except
time.sleep(1)
logger.exception("Error when launching GraphScope on kubernetes cluster")
self.stop()
return False
return True
def stop(self, is_dangling=False):
if self._serving:
logger.info("Cleaning up kubernetes resources")
for target in self._resource_object:
delete_kubernetes_object(
api_client=self._api_client,
target=target,
wait=self._waiting_for_delete,
timeout_seconds=self._timeout_seconds,
)
self._resource_object.clear()
if is_dangling:
logger.info("Dangling coordinator detected, cleaning up...")
# delete everything inside namespace of graphscope instance
if self._delete_namespace:
# delete namespace created by graphscope
self._core_api.delete_namespace(self._namespace)
if self._waiting_for_delete:
start_time = time.time()
while True:
try:
self._core_api.read_namespace(self._namespace)
except K8SApiException as ex:
if ex.status != 404:
logger.exception(
"Deleting dangling namespace %s failed",
self._namespace,
)
break
else:
if time.time() - start_time > self._timeout_seconds:
logger.error(
"Deleting namespace %s timeout", self._namespace
)
time.sleep(self._retry_time_seconds)
else:
# delete coordinator deployment and service
self._delete_dangling_coordinator()
self._serving = False
logger.info("Kubernetes launcher stopped")
def _allocate_graphlearn_engine(self, object_id):
# check the graphlearn engine flag
if not self._config.kubernetes_launcher.engine.enable_gle:
raise NotImplementedError("GraphLearn engine not enabled")
# allocate graphlearn engine based on the mode
if self._deploy_mode == "eager":
return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
return self.deploy_graphlearn_engine(object_id)
def _allocate_graphlearn_torch_engine(self, object_id):
# check the graphlearn torch engine flag
if not self._config.kubernetes_launcher.engine.enable_glt:
raise NotImplementedError("GraphLearn torch engine not enabled")
# allocate graphlearn engine based on the mode
if self._deploy_mode == "eager":
return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list
return self.deploy_graphlearn_torch_engine(object_id)
def _distribute_graphlearn_process(
self, pod_name_list, pod_host_ip_list, object_id, handle, config
):
# allocate service for ports
# prepare arguments
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
"utf-8", errors="ignore"
)
)
hosts = ",".join(
[
f"{pod_name}:{port}"
for pod_name, port in zip(
pod_name_list,
self._engine_cluster.get_graphlearn_ports(
self._graphlearn_start_port
),
)
]
)
handle["server"] = hosts
handle = base64.b64encode(
json.dumps(handle).encode("utf-8", errors="ignore")
).decode("utf-8", errors="ignore")
# launch the server
self._graphlearn_instance_processes[object_id] = []
for pod_index, pod in enumerate(self._pod_name_list):
container = GRAPHLEARN_CONTAINER_NAME
sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
# logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
proc.stdout,
sys.stdout,
drop=True,
suppressed=(not logger.isEnabledFor(logging.DEBUG)),
)
setattr(proc, "stdout_watcher", stdout_watcher)
self._graphlearn_instance_processes[object_id].append(proc)
# Create Service
self._create_graphlearn_service(object_id)
# update the port usage record
self._graphlearn_start_port += len(pod_name_list)
# parse the service hosts and ports
return self._engine_cluster.get_graphlearn_service_endpoint(
self._api_client, object_id, pod_host_ip_list
)
def _distribute_graphlearn_torch_process(
self, pod_name_list, pod_ip_list, object_id, handle, config
):
# allocate service for ports
# prepare arguments
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
"utf-8", errors="ignore"
)
)
ports = self._engine_cluster.get_graphlearn_torch_ports(
self._graphlearn_torch_start_port
)
handle["master_addr"] = pod_ip_list[0]
handle["server_client_master_port"] = ports[0]
server_list = [f"{pod_ip_list[0]}:{ports[i]}" for i in range(4)]
server_handle = base64.b64encode(
json.dumps(handle).encode("utf-8", errors="ignore")
).decode("utf-8", errors="ignore")
# launch the server
self._graphlearn_torch_instance_processes[object_id] = []
for pod_index, pod in enumerate(self._pod_name_list):
container = GRAPHLEARN_TORCH_CONTAINER_NAME
sub_cmd = f"env PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python \
python3 -m gscoordinator.launch_graphlearn_torch \
{server_handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
# logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
proc.stdout,
sys.stdout,
suppressed=(not logger.isEnabledFor(logging.DEBUG)),
)
time.sleep(5)
logger.debug("process status: %s", proc.poll())
setattr(proc, "stdout_watcher", stdout_watcher)
self._graphlearn_torch_instance_processes[object_id].append(proc)
# Create Service
self._create_graphlearn_torch_service(object_id)
# update the port usage record
self._graphlearn_torch_start_port += len(pod_name_list)
# prepare config map for client scripts
config_map = kube_client.V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=kube_client.V1ObjectMeta(
name="graphlearn-torch-client-config",
namespace=self._namespace,
),
data=handle["client_content"],
)
self._core_api.create_namespaced_config_map(self._namespace, config_map)
# prepare the manifest
pytorch_job_manifest = replace_string_in_dict(
handle["manifest"], "${MASTER_ADDR}", handle["master_addr"]
)
# parse the pytorchjob yaml
group = pytorch_job_manifest["apiVersion"].split("/")[0]
version = pytorch_job_manifest["apiVersion"].split("/")[1]
name = pytorch_job_manifest["metadata"]["name"]
namespace = pytorch_job_manifest["metadata"]["namespace"]
plural = "pytorchjobs" # This is PyTorchJob CRD's plural name
try:
# create PyTorchJob
api_response = self._pytorchjobs_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=pytorch_job_manifest,
)
logger.info(api_response)
except K8SApiException as e:
logger.info(
f"Exception when calling CustomObjectsApi->create_namespaced_custom_object: {e}"
)
raise
# set Watcher to monitor the state of the PyTorchJob
w = kube_watch.Watch()
# loop checking the state of PyTorchJob
for event in w.stream(
self._pytorchjobs_api.list_namespaced_custom_object,
group,
version,
namespace,
plural,
):
pytorch_job = event["object"]
if pytorch_job.get("metadata", {}).get("name") == name:
status = pytorch_job.get("status", {})
if status: # check status existence
conditions = status.get("conditions", [])
for condition in conditions:
if (
condition.get("type") == "Succeeded"
and condition.get("status") == "True"
):
logger.info(f"PyTorchJob {name} has succeeded!")
w.stop()
break
elif (
condition.get("type") == "Failed"
and condition.get("status") == "True"
):
logger.info(f"PyTorchJob {name} has failed!")
w.stop()
break
self.close_graphlearn_torch_client(group, name, version, plural, namespace)
return server_list
def create_learning_instance(self, object_id, handle, config, learning_backend):
if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
pod_name_list, _, pod_host_ip_list = self._allocate_graphlearn_engine(
object_id
)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
return self._distribute_graphlearn_process(
pod_name_list, pod_host_ip_list, object_id, handle, config
)
elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
(
pod_name_list,
pod_ip_list,
pod_host_ip_list,
) = self._allocate_graphlearn_torch_engine(object_id)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
return self._distribute_graphlearn_torch_process(
pod_name_list, pod_ip_list, object_id, handle, config
)
else:
raise ValueError("invalid learning backend")
def close_learning_instance(self, object_id, learning_backend):
if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
self.close_graphlearn_instance(object_id)
elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
self.close_graphlearn_torch_instance(object_id)
else:
raise ValueError("invalid learning backend")
def close_graphlearn_instance(self, object_id):
if self._deploy_mode == "lazy":
self.delete_graphlearn_engine(object_id)
return
if object_id not in self._graphlearn_instance_processes:
return
# delete the services
target = self._graphlearn_services[object_id]
try:
delete_kubernetes_object(
api_client=self._api_client,
target=target,
wait=self._waiting_for_delete,
timeout_seconds=self._timeout_seconds,
)
except Exception: # pylint: disable=broad-except
logger.exception("Failed to delete graphlearn service for %s", object_id)
# terminate the process
for proc in self._graphlearn_instance_processes[object_id]:
try:
proc.terminate()
proc.wait(1)
except Exception: # pylint: disable=broad-except
logger.exception("Failed to terminate graphlearn server")
self._graphlearn_instance_processes[object_id].clear()
def close_graphlearn_torch_instance(self, object_id):
if self._deploy_mode == "lazy":
self.delete_graphlearn_torch_engine(object_id)
return
if object_id not in self._graphlearn_torch_instance_processes:
return
# delete the services
target = self._graphlearn_torch_services[object_id]
try:
delete_kubernetes_object(
api_client=self._api_client,
target=target,
wait=self._waiting_for_delete,
timeout_seconds=self._timeout_seconds,
)
except Exception: # pylint: disable=broad-except
logger.exception(
"Failed to delete graphlearn torch service for %s", object_id
)
# terminate the process
for proc in self._graphlearn_torch_instance_processes[object_id]:
try:
proc.terminate()
proc.wait(1)
except Exception: # pylint: disable=broad-except
logger.exception("Failed to terminate graphlearn torch server")
self._graphlearn_torch_instance_processes[object_id].clear()
def close_graphlearn_torch_client(self, group, name, version, plural, namespace):
# clear PyTorchJob
logger.info(f"Deleting PyTorchJob {name}...")
try:
response = self._pytorchjobs_api.delete_namespaced_custom_object(
group=group,
name=name,
version=version,
plural=plural,
namespace=namespace,
body=kube_client.V1DeleteOptions(
propagation_policy="Foreground",
),
)
logger.info(f"PyTorchJob {name} deleted. Response: {response}")
except K8SApiException as e:
logger.info(
f"Exception when calling CustomObjectsApi->delete_namespaced_custom_object: {e}"
)
try:
response = self._core_api.delete_namespaced_config_map(
name="graphlearn-torch-client-config",
namespace=self._namespace,
)
logger.info(
f"ConfigMap graphlearn-torch-client-config deleted. Response: {response}"
)
except K8SApiException as e:
logger.info(
f"Exception when calling CoreV1Api->delete_namespaced_config_map: {e}"
)
class ResourceManager(object):
"""A class to manager kubernetes object.
Object managed by this class will dump meta info to disk file
for pod preStop lifecycle management.
meta info format:
{
"my-deployment": "Deployment",
"my-service": "Service"
}
"""
_resource_object_path = os.path.join(get_tempdir(), "resource_object") # fixed
def __init__(self, api_client):
self._api_client = api_client
self._resource_object = []
self._meta_info = {}
def append(self, target):
self._resource_object.append(target)
self._meta_info.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
self.dump()
def extend(self, targets):
self._resource_object.extend(targets)
for target in targets:
self._meta_info.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
self.dump()
def clear(self):
self._resource_object.clear()
self._meta_info.clear()
def __str__(self):
return str(self._meta_info)
def __getitem__(self, index):
return self._resource_object[index]
def dump(self, extra_resource=None):
"""Dump meta info to disk file.
Args:
extra_resource (dict): extra resource to dump.
A typical scenario is dumping meta info of namespace
for coordinator dangling processing.
"""
if extra_resource is not None:
rlt = copy.deepcopy(self._meta_info)
rlt.update(extra_resource)
else:
rlt = self._meta_info
with open(self._resource_object_path, "w") as f:
json.dump(rlt, f)