python/graphscope/deploy/kubernetes/resource_builder.py (351 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import sys from kubernetes import client as kube_client from graphscope.deploy.kubernetes.utils import parse_readable_memory from graphscope.framework.utils import get_tempdir logger = logging.getLogger("graphscope") class ResourceBuilder: @staticmethod def get_configmap(name, kvs): metadata = kube_client.V1ObjectMeta(name=name) configmap = kube_client.V1ConfigMap(metadata=metadata, data=kvs) return configmap @staticmethod def get_role(name, namespace, api_groups, resources, verbs, labels): metadata = kube_client.V1ObjectMeta(name=name, namespace=namespace) metadata.labels = labels rule = kube_client.V1PolicyRule( api_groups=api_groups.split(","), resources=resources.split(","), verbs=verbs.split(","), ) role = kube_client.V1Role(metadata=metadata, rules=[rule]) return role @staticmethod def get_cluster_role(name, api_groups, resources, verbs, labels): metadata = kube_client.V1ObjectMeta(name=name, labels=labels) rule = kube_client.V1PolicyRule( api_groups=api_groups.split(","), resources=resources.split(","), verbs=verbs.split(","), ) role = kube_client.V1ClusterRole(metadata=metadata, rules=[rule]) return role @staticmethod def get_role_binding(name, namespace, role_name, service_account_name, labels): metadata = kube_client.V1ObjectMeta(name=name, namespace=namespace) metadata.labels = labels role_ref = kube_client.V1RoleRef( kind="Role", name=role_name, api_group="rbac.authorization.k8s.io" ) subject = kube_client.RbacV1Subject( kind="ServiceAccount", name=service_account_name, namespace=namespace ) role_binding = kube_client.V1RoleBinding( metadata=metadata, role_ref=role_ref, subjects=[subject] ) return role_binding @staticmethod def get_cluster_role_binding( name, namespace, role_name, service_account_name, labels ): metadata = kube_client.V1ObjectMeta(name=name, labels=labels) role_ref = kube_client.V1RoleRef( kind="ClusterRole", name=role_name, api_group="rbac.authorization.k8s.io" ) subject = kube_client.RbacV1Subject( kind="ServiceAccount", name=service_account_name, namespace=namespace ) role_binding = kube_client.V1ClusterRoleBinding( metadata=metadata, role_ref=role_ref, subjects=[subject] ) return role_binding @staticmethod def get_tcp_probe(port, timeout=15, period=10, failure_threshold=8): return kube_client.V1Probe( tcp_socket=kube_client.V1TCPSocketAction(port=port), timeout_seconds=timeout, period_seconds=period, failure_threshold=failure_threshold, ) @staticmethod def get_exec_action(command): return kube_client.V1ExecAction(command=command) @staticmethod def get_lifecycle_handler(_exec=None, http_get=None, tcp_socket=None): handler = kube_client.V1LifecycleHandler( _exec=_exec, http_get=http_get, tcp_socket=tcp_socket ) return handler @staticmethod def get_lifecycle(post_start=None, pre_stop=None): return kube_client.V1Lifecycle(post_start=post_start, pre_stop=pre_stop) @staticmethod def get_image_pull_secrets(image_pull_secrets): """ for name in self._image_pull_secrets: engine_builder.add_image_pull_secret(name) """ local_object_refs = [] for name in image_pull_secrets: local_object_refs.append(kube_client.V1LocalObjectReference(name=name)) return local_object_refs @staticmethod def get_node_selector(node_selector): import base64 import json decoded_node_selector = base64.b64decode(node_selector).decode( "utf-8", errors="ignore" ) return json.loads(decoded_node_selector) @staticmethod def get_user_defined_volumes(udf_volumes): """ .. code:: python { name: { "type": "", "field": {}, # the keys are subject to volume type "mounts": [ {"mountPath": "", "subPath": ""}, ... ] } } """ if not udf_volumes: return [], [], [] volumes, source_volume_mounts = [], [] for name, value in udf_volumes.items(): volume = kube_client.V1Volume(name=name) field = value.get("field", {}) if value["type"] == "hostPath": volume.host_path = kube_client.V1HostPathVolumeSource( path=field["path"] ) if "type" in field: volume.host_path.type = field["type"] elif value["type"] == "emptyDir": volume.empty_dir = kube_client.V1EmptyDirVolumeSource() if "medium" in field: volume.empty_dir.medium = field["medium"] if "sizeLimit" in field: volume.empty_dir.size_limit = field["sizeLimit"] elif value["type"] == "persistentVolumeClaim": pvc = kube_client.V1PersistentVolumeClaimVolumeSource( claim_name=field["claimName"] ) volume.persistent_volume_claim = pvc if "readOnly" in field: volume.persistent_volume_claim.read_only = field["readOnly"] elif value["type"] == "configMap": volume.config_map = kube_client.V1ConfigMapVolumeSource( name=field["name"] ) elif value["type"] == "secret": volume.secret = kube_client.V1SecretVolumeSource( secret_name=field["name"] ) else: raise ValueError(f"Unsupported volume type: {value['type']}") volume_mounts = [] mounts_list = value["mounts"] if not isinstance(mounts_list, list): mounts_list = [value["mounts"]] for udf_mount in mounts_list: volume_mount = kube_client.V1VolumeMount( name=name, mount_path=udf_mount["mountPath"] ) if "subPath" in udf_mount: volume_mount.sub_path = udf_mount["subPath"] if "readOnly" in udf_mount: volume_mount.read_only = udf_mount["readOnly"] volume_mounts.append(volume_mount) volumes.append(volume) source_volume_mounts.extend(volume_mounts) # Assume destination mounts are the same as source mounts destination_volume_mounts = source_volume_mounts return volumes, source_volume_mounts, destination_volume_mounts @staticmethod def get_resources(requests, limits): resource_requirements = kube_client.V1ResourceRequirements() if requests is not None: resource_requirements.requests = requests if limits is not None: resource_requirements.limits = limits return resource_requirements @staticmethod def get_pod_spec( containers: [kube_client.V1Container], image_pull_secrets=None, node_selector=None, volumes=None, ): pod_spec = kube_client.V1PodSpec(containers=containers) if image_pull_secrets is not None and image_pull_secrets: pod_spec.image_pull_secrets = ResourceBuilder.get_image_pull_secrets( image_pull_secrets ) if node_selector is not None and node_selector: pod_spec.node_selector = ResourceBuilder.get_node_selector(node_selector) if volumes is not None and volumes: pod_spec.volumes = volumes return pod_spec @staticmethod def get_pod_template_spec( spec: kube_client.V1PodSpec, labels: dict, annotations=None, default_container=None, ): pod_template_spec = kube_client.V1PodTemplateSpec() pod_template_spec.spec = spec if annotations is None: annotations = dict() if default_container is not None: annotations["kubectl.kubernetes.io/default-container"] = default_container pod_template_spec.metadata = kube_client.V1ObjectMeta( labels=labels, annotations=annotations ) return pod_template_spec @staticmethod def get_deployment_spec(template, replicas, labels): selector = kube_client.V1LabelSelector(match_labels=labels) spec = kube_client.V1DeploymentSpec(selector=selector, template=template) spec.replicas = replicas return spec @staticmethod def get_deployment(namespace, name, spec, labels): deployment = kube_client.V1Deployment() deployment.api_version = "apps/v1" deployment.kind = "Deployment" deployment.metadata = kube_client.V1ObjectMeta( name=name, labels=labels, namespace=namespace ) deployment.spec = spec return deployment @staticmethod def get_stateful_set_spec(template, replicas, labels, service_name): selector = kube_client.V1LabelSelector(match_labels=labels) spec = kube_client.V1StatefulSetSpec( selector=selector, template=template, service_name=service_name ) spec.replicas = replicas return spec @staticmethod def get_stateful_set(namespace, name, spec, labels): statefulset = kube_client.V1StatefulSet() statefulset.api_version = "apps/v1" statefulset.kind = "StatefulSet" statefulset.metadata = kube_client.V1ObjectMeta( name=name, labels=labels, namespace=namespace ) statefulset.spec = spec return statefulset @staticmethod def get_value_from_field_ref(name, field_path): env = kube_client.V1EnvVar(name=name) value_from = kube_client.V1EnvVarSource() value_from.field_ref = kube_client.V1ObjectFieldSelector(field_path=field_path) env.value_from = value_from return env @staticmethod def get_namespace(name): namespace = kube_client.V1Namespace() namespace.metadata = kube_client.V1ObjectMeta(name=name) namespace.metadata.labels = {"kubernetes.io/metadata.name": name} return namespace @staticmethod def get_service_spec(type, ports, labels, external_traffic_policy): service_spec = kube_client.V1ServiceSpec() service_spec.type = type service_spec.selector = labels service_spec.ports = ports if external_traffic_policy is not None: service_spec.external_traffic_policy = external_traffic_policy return service_spec @staticmethod def get_service(namespace, name, service_spec, labels, annotations=None): service = kube_client.V1Service() service.api_version = "v1" service.kind = "Service" service.spec = service_spec metadata = kube_client.V1ObjectMeta( namespace=namespace, name=name, labels=labels, annotations=annotations ) service.metadata = metadata return service class CoordinatorDeployment: def __init__( self, namespace, name, image, args, labels, image_pull_secret, image_pull_policy, node_selector, env, host_network, port=None, ): self._replicas = 1 self._namespace = namespace self._name = name self._image = image self._args = args self._labels = labels self._image_pull_policy = image_pull_policy self._image_pull_secret = image_pull_secret self._env: dict = env self._port = port self._host_network = host_network self._node_selector = node_selector self._requests = {"cpu": 0.5, "memory": "512Mi"} self._limits = {"cpu": 0.5, "memory": "512Mi"} def get_lifecycle(self): pre_stop = ["python3", "-m", "gscoordinator.hook.prestop"] _exec = ResourceBuilder.get_exec_action(pre_stop) lifecycle_handler = ResourceBuilder.get_lifecycle_handler(_exec) lifecycle = ResourceBuilder.get_lifecycle(pre_stop=lifecycle_handler) return lifecycle def get_coordinator_container(self): resources = ResourceBuilder.get_resources(self._requests, self._limits) lifecycle = self.get_lifecycle() env = [ kube_client.V1EnvVar(name=key, value=value) for key, value in self._env.items() ] container = kube_client.V1Container( name="coordinator", image=self._image, image_pull_policy=self._image_pull_policy, args=self._args, resources=resources, lifecycle=lifecycle, env=env, ) if self._port is not None: container_ports = [kube_client.V1ContainerPort(container_port=self._port)] container_ports.append(kube_client.V1ContainerPort(container_port=8000)) container.ports = container_ports container.readiness_probe = ResourceBuilder.get_tcp_probe( port=self._port, timeout=15, period=1, failure_threshold=20 ) return container def get_coordinator_pod_spec(self): container = self.get_coordinator_container() pod_spec = ResourceBuilder.get_pod_spec( containers=[container], image_pull_secrets=self._image_pull_secret, node_selector=self._node_selector, ) pod_spec.host_network = self._host_network return pod_spec def get_coordinator_pod_template_spec(self): spec = self.get_coordinator_pod_spec() return ResourceBuilder.get_pod_template_spec( spec, self._labels, default_container="coordinator" ) def get_coordinator_deployment_spec(self, replicas): template = self.get_coordinator_pod_template_spec() spec = ResourceBuilder.get_deployment_spec(template, replicas, self._labels) return spec def get_coordinator_deployment(self): spec = self.get_coordinator_deployment_spec(self._replicas) return ResourceBuilder.get_deployment( self._namespace, self._name, spec, self._labels ) def get_coordinator_service(self, service_type, port): ports = [kube_client.V1ServicePort(name="coordinator", port=port)] ports.append(kube_client.V1ServicePort(name="debug", port=8000)) service_spec = ResourceBuilder.get_service_spec( service_type, ports, self._labels, None ) service = ResourceBuilder.get_service( self._namespace, self._name, service_spec, self._labels ) return service