python/graphscope/deploy/kubernetes/cluster.py (340 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import base64 import copy import logging import os import queue import random import time from kubernetes import client as kube_client from kubernetes.client import CoreV1Api from kubernetes.client.rest import ApiException as K8SApiException from graphscope.config import Config from graphscope.deploy.kubernetes.resource_builder import CoordinatorDeployment from graphscope.deploy.kubernetes.resource_builder import ResourceBuilder from graphscope.deploy.kubernetes.utils import KubernetesPodWatcher from graphscope.deploy.kubernetes.utils import delete_kubernetes_object from graphscope.deploy.kubernetes.utils import get_service_endpoints from graphscope.deploy.kubernetes.utils import try_to_read_namespace_from_context from graphscope.deploy.kubernetes.utils import wait_for_deployment_complete from graphscope.deploy.launcher import Launcher from graphscope.framework.utils import random_string from graphscope.version import __version__ logger = logging.getLogger("graphscope") class KubernetesClusterLauncher(Launcher): """Class for setting up GraphScope instance on kubernetes cluster.""" _coordinator_name_prefix = "coordinator-" _role_name_prefix = "gs-reader-" _role_binding_name_prefix = f"{_role_name_prefix}binding-" _cluster_role_name_prefix = "gs-cluster-reader-" _cluster_role_binding_name_prefix = f"{_cluster_role_name_prefix}binding-" _url_pattern = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))" # noqa: E501 _endpoint_pattern = r"(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*" _coordinator_container_name = "coordinator" _coordinator_service_port_name = "coordinator" def __init__( self, config: Config, api_client: kube_client.ApiClient, ): super().__init__() self._config = copy.deepcopy(config) self._api_client = api_client self._core_api = kube_client.CoreV1Api(api_client) self._app_api = kube_client.AppsV1Api(api_client) self._rbac_api = kube_client.RbacAuthorizationV1Api(api_client) self._service_type = config.kubernetes_launcher.service_type self._registry = config.kubernetes_launcher.image.registry self._repository = config.kubernetes_launcher.image.repository self._tag = config.kubernetes_launcher.image.tag self._image_pull_policy = config.kubernetes_launcher.image.pull_policy self._image_pull_secrets = config.kubernetes_launcher.image.pull_secrets self._instance_id = config.session.instance_id self._role_name = self._role_name_prefix + self._instance_id self._role_binding_name = self._role_binding_name_prefix + self._instance_id self._cluster_role_name = "" self._cluster_role_binding_name = "" # all resource created inside namespace self._resource_object = [] self._coordinator_name = self._coordinator_name_prefix + self._instance_id self._coordinator_service_name = self._coordinator_name self._config.coordinator.deployment_name = self._coordinator_name self._namespace = config.kubernetes_launcher.namespace if self._namespace is None: self._namespace = try_to_read_namespace_from_context() # Doesn't have any namespace info in kube context. if self._namespace is None: self._namespace = self._get_free_namespace() self._config.kubernetes_launcher.namespace = self._namespace self._closed = False # pods watcher self._coordinator_pods_watcher = None self._logs = [] self._labels = { "app.kubernetes.io/name": "graphscope", "app.kubernetes.io/instance": self._instance_id, "app.kubernetes.io/version": __version__, "app.kubernetes.io/component": "coordinator", } def __del__(self): self.stop() def poll(self): """Check the coordinator pod status, 0 for success.""" return 0 def get_namespace(self): """Get kubernetes namespace which graphscope instance running on. Returns: str: Kubernetes namespace. """ return self._namespace def type(self): return "k8s" def _get_free_namespace(self): while True: namespace = "gs-" + random_string(6) if not self._namespace_exist(namespace): return namespace def _resource_exist(self, func, *args): try: func(*args) except K8SApiException as e: if e.status != 404: # Not found raise return False return True def _namespace_exist(self, namespace): return self._resource_exist(self._core_api.read_namespace, namespace) def _role_exist(self, namespace, role): return self._resource_exist( self._rbac_api.read_namespaced_role, role, namespace ) def _cluster_role_exist(self, cluster_role): return self._resource_exist(self._rbac_api.read_cluster_role, cluster_role) def _role_binding_exist(self, namespace, role_binding): return self._resource_exist( self._rbac_api.read_namespaced_role_binding, role_binding, namespace ) def _cluster_role_binding_exist(self, cluster_role_binding): return self._resource_exist( self._rbac_api.read_cluster_role_binding, cluster_role_binding ) def _create_namespace(self): if not self._namespace_exist(self._namespace): namespace = ResourceBuilder.get_namespace(self._namespace) self._core_api.create_namespace(namespace) self._config.kubernetes_launcher.delete_namespace = True def _create_role_and_binding(self): self._cluster_role_name = self._cluster_role_name_prefix + self._namespace self._cluster_role_binding_name = ( self._cluster_role_binding_name_prefix + self._namespace ) # create a role and bind to default service account. targets = [] if not self._role_exist(namespace=self._namespace, role=self._role_name): role = ResourceBuilder.get_role( name=self._role_name, namespace=self._namespace, api_groups=",apps,extensions,kubeflow.org", # The leading comma is necessary, represents for core api group. resources="configmaps,deployments,deployments/status,statefulsets,statefulsets/status,endpoints,events,pods,pods/log,pods/exec,pods/status,services,replicasets,pytorchjobs", # noqa: E501 verbs="create,delete,get,update,watch,list", labels=self._labels, ) ret = self._rbac_api.create_namespaced_role(self._namespace, role) targets.append(ret) if not self._role_binding_exist(self._namespace, self._role_binding_name): role_binding = ResourceBuilder.get_role_binding( name=self._role_binding_name, namespace=self._namespace, role_name=self._role_name, service_account_name="default", labels=self._labels, ) ret = self._rbac_api.create_namespaced_role_binding( self._namespace, role_binding ) targets.append(ret) if self._config.kubernetes_launcher.delete_namespace: # Create clusterRole to delete namespace. if not self._cluster_role_exist(cluster_role=self._cluster_role_name): cluster_role = ResourceBuilder.get_cluster_role( name=self._cluster_role_name, api_groups="apps", resources="namespaces", verbs="create,delete,get,update,watch,list", labels=self._labels, ) ret = self._rbac_api.create_cluster_role(cluster_role) targets.append(ret) if not self._cluster_role_binding_exist( cluster_role_binding=self._cluster_role_binding_name ): cluster_role_binding = ResourceBuilder.get_cluster_role_binding( name=self._cluster_role_binding_name, namespace=self._namespace, role_name=self._cluster_role_name, service_account_name="default", labels=self._labels, ) ret = self._rbac_api.create_cluster_role_binding(cluster_role_binding) targets.append(ret) self._resource_object.extend(targets) def _create_coordinator(self): logger.info("Launching coordinator...") targets = [] env = { "PYTHONUNBUFFERED": "TRUE", "KUBE_NAMESPACE": self._namespace, "INSTANCE_ID": self._instance_id, "GREMLIN_EXPOSE": self._service_type, } if "KUBE_API_ADDRESS" in os.environ: env["KUBE_API_ADDRESS"] = os.environ["KUBE_API_ADDRESS"] if self._registry: image_prefix = f"{self._registry}/{self._repository}" else: image_prefix = self._repository image = f"{image_prefix}/coordinator:{self._tag}" args = self._get_coordinator_args() image_pull_policy = self._config.kubernetes_launcher.image.pull_policy host_network = "ENABLE_HOST_NETWORK" in os.environ node_selector = self._config.coordinator.node_selector port = self._config.coordinator.service_port coordinator = CoordinatorDeployment( namespace=self._namespace, name=self._coordinator_name, image=image, args=args, labels=self._labels, image_pull_secret=self._image_pull_secrets, image_pull_policy=image_pull_policy, node_selector=node_selector, env=env, host_network=host_network, port=port, ) deployment = coordinator.get_coordinator_deployment() response = self._app_api.create_namespaced_deployment( self._namespace, deployment ) targets.append(response) # create coordinator service service = coordinator.get_coordinator_service( service_type=self._service_type, port=port ) response = self._core_api.create_namespaced_service(self._namespace, service) targets.append(response) self._resource_object.extend(targets) def base64_encode(self, string): return base64.b64encode(string.encode("utf-8")).decode("utf-8", errors="ignore") def _get_coordinator_args(self): args = [ "python3", "-m", "gscoordinator", "--config", self.base64_encode(self._config.dumps_json()), ] return args def _create_services(self): self._create_coordinator() def _waiting_for_services_ready(self): response = self._app_api.read_namespaced_deployment_status( namespace=self._namespace, name=self._coordinator_name ) # get deployment pods match_labels = response.spec.selector.match_labels selector = ",".join([f"{k}={v}" for k, v in match_labels.items()]) pods = self._core_api.list_namespaced_pod( namespace=self._namespace, label_selector=selector ) assert len(pods.items) == 1, "coordinator deployment should have only one pod" pod = pods.items[0] self._coordinator_pods_watcher = KubernetesPodWatcher( api_client=self._api_client, namespace=self._namespace, pod=pod, container="coordinator", ) self._coordinator_pods_watcher.start() if wait_for_deployment_complete( api_client=self._api_client, namespace=self._namespace, name=self._coordinator_name, pods_watcher=self._coordinator_pods_watcher, timeout_seconds=self._config.session.timeout_seconds, ): self._coordinator_pods_watcher.stop() def _try_to_get_coordinator_service_from_configmap(self): config_map_name = f"gs-coordinator-{self._instance_id}" start_time = time.time() while True: try: response = self._core_api.read_namespaced_config_map( name=config_map_name, namespace=self._namespace ) return f"{response.data['ip']}:{response.data['port']}" except K8SApiException: pass time.sleep(1) if time.time() - start_time > self._config.session.timeout_seconds: raise TimeoutError("Get coordinator service from configmap timeout") def _get_coordinator_endpoint(self): if self._service_type is None: # try to get endpoint from configmap return self._try_to_get_coordinator_service_from_configmap() # Always len(endpoints) >= 1 endpoints = get_service_endpoints( api_client=self._api_client, namespace=self._namespace, name=self._coordinator_service_name, service_type=self._service_type, ) return endpoints[0] def _dump_coordinator_failed_status(self): # Dump failed status even show_log is False if self._coordinator_pods_watcher is None: return while True: try: message = self._coordinator_pods_watcher.poll(timeout_seconds=3) logger.error(message, extra={"simple": True}) except queue.Empty: break self._coordinator_pods_watcher.stop() self._coordinator_pods_watcher = None def start(self): """Launch graphscope instance on kubernetes cluster. Raises: RuntimeError: If instance launch failed or timeout. Returns: str: Coordinator service endpoint. """ try: self._create_namespace() self._create_role_and_binding() self._create_services() time.sleep(1) self._waiting_for_services_ready() self._coordinator_endpoint = self._get_coordinator_endpoint() logger.info( "Coordinator pod start successful with address %s, connecting to service ...", self._coordinator_endpoint, ) except Exception: time.sleep(1) self._dump_coordinator_failed_status() self.stop() raise def stop(self, wait=False): """Stop graphscope instance on kubernetes cluster. Raises: TimeoutError: Waiting for stop instance timeout when ``wait`` or ``_waiting_for_delete`` is True. """ # delete resources created by graphscope inside namespace # make sure delete permission resources in the end logger.info("Stopping coordinator") for target in reversed(self._resource_object): delete_kubernetes_object( api_client=self._api_client, target=target, wait=self._config.kubernetes_launcher.waiting_for_delete, timeout_seconds=self._config.session.timeout_seconds, ) self._resource_object = [] if self._config.kubernetes_launcher.delete_namespace: # delete namespace api = CoreV1Api(self._api_client) try: api.delete_namespace(self._namespace) self._config.kubernetes_launcher.delete_namespace = False except K8SApiException as e: if e.status == 404: # namespace already deleted. pass else: raise logger.info("Stopped coordinator") if __name__ == "__main__": from kubernetes import config as kube_config kube_config.load_kube_config() client = kube_client.ApiClient() config = Config() config.kubernetes_launcher.namespace = "demo" config.kubernetes_launcher.service_type = "NodePort" config.session.num_workers = 2 launcher = KubernetesClusterLauncher( config=config, api_client=client, ) launcher.start() print(launcher._get_coordinator_endpoint()) launcher.stop()