python/graphscope/config.py (298 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """GraphScope default configuration.""" import base64 import json from dataclasses import dataclass from dataclasses import field from typing import List from typing import Union from simple_parsing import ArgumentParser from simple_parsing.helpers import Serializable from simple_parsing.helpers import list_field from graphscope.version import __version__ registry = "registry.cn-hongkong.aliyuncs.com" @dataclass class ResourceSpec: """Resource requirements for a container in kubernetes.""" # CPU cores of container. cpu: Union[str, float, None] = None # Memory of container, suffix with ['Mi', 'Gi', 'Ti']. memory: Union[str, None] = None def as_dict(self): ret = {} if self.cpu is not None: ret["cpu"] = self.cpu if self.memory is not None: ret["memory"] = self.memory return ret @dataclass class ResourceConfig: """Resource spec for a container in kubernetes.""" requests: ResourceSpec = None # Resource requests of container. limits: ResourceSpec = None # Resource limits of container. def get_requests(self): return self.requests.as_dict() if self.requests is not None else None def get_limits(self): return self.limits.as_dict() if self.limits is not None else None def set_cpu_request(self, cpu): self.requests.cpu = cpu # self.limits.cpu = cpu def set_mem_request(self, memory): self.requests.memory = memory # self.limits.memory = memory @staticmethod def make_burstable(cpu, memory): """Get default resource config for a container in kubernetes.""" return ResourceConfig( requests=ResourceSpec(cpu=cpu, memory=memory), # limits=ResourceSpec(cpu=cpu, memory=memory), ) @dataclass class ImageConfig: """Image related stuffs.""" # k8s image registry. registry: Union[str, None] = "registry.cn-hongkong.aliyuncs.com" repository: str = "graphscope" # k8s image repository. tag: str = __version__ # k8s image tag. # A list of secrets to pull image. pull_secrets: List[str] = field(default_factory=list) pull_policy: str = "IfNotPresent" # Kubernetes image pull policy. @dataclass class MarsConfig: """Mars configuration""" enable: bool = False # Enable Mars or not. worker_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "4Mi") ) scheduler_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "4Mi") ) @dataclass class DatasetConfig: """A Dataset container could be shipped with GraphScope in kubernetes.""" enable: bool = False # Mount the aliyun dataset bucket as a volume by ossfs. # A json string specifies the dataset proxy info. Available options of proxy: http_proxy, https_proxy, no_proxy. proxy: Union[str, None] = None @dataclass class EngineConfig: """Engine configuration""" enabled_engines: str = "gae,gie" # A set of engines to enable. # Node selector for engine pods, default is None. node_selector: Union[str, None] = None enable_gae: bool = False # Enable or disable analytical engine. # Enable or disable analytical engine with java support. enable_gae_java: bool = False enable_gie: bool = False # Enable or disable interactive engine. enable_gle: bool = False # Enable or disable graphlearn engine. enable_glt: bool = False # Enable or disable graphlearn_torch engine. preemptive: bool = True # Resource for analytical pod gae_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(1, "4Gi") ) # Resource for interactive executor pod gie_executor_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(1, "2Gi") ) # Resource for interactive frontend pod gie_frontend_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.5, "1Gi") ) # Resource for learning pod gle_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "1Gi") ) # Resource for learning pod glt_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "1Gi") ) def post_setup(self): valid_engines = set( "analytical,analytical-java,interactive,learning,gae,gae-java,gie,gle,glt".split( "," ) ) for item in [item.strip() for item in self.enabled_engines.split(",")]: if item not in valid_engines and item != "": print(f"Not a valid engine name: {item}") if item == "analytical" or item == "gae": self.enable_gae = True if item == "interactive" or item == "gie": self.enable_gie = True if item == "graphlearn" or item == "gle": self.enable_gle = True if item == "graphlearn-torch" or item == "glt": self.enable_glt = True if item == "analytical-java" or item == "gae-java": self.enable_gae_java = True if self.preemptive: self.gae_resource.requests = None self.gle_resource.requests = None self.glt_resource.requests = None self.gie_executor_resource.requests = None self.gie_frontend_resource.requests = None @dataclass class EtcdConfig: """Etcd configuration.""" endpoint: Union[str, None] = None """The address of external etcd cluster, with formats like 'etcd01:port,etcd02:port,etcd03:port'. If address is set, all other etcd configurations are ignored. """ # The port that etcd server will bind to for accepting client connections. Defaults to 2379. listening_client_port: int = 2379 # The port that etcd server will bind to for accepting peer connections. Defaults to 2380. listening_peer_port: int = 2380 # Kubernetes related config replicas: int = 1 @dataclass class VineyardConfig: """Vineyard configuration""" # Vineyard IPC socket path, a socket suffixed by timestamp will be created in '/tmp' if not given. socket: Union[str, None] = None rpc_port: int = 9600 # Vineyard RPC port. # Kubernetes related config # The name of vineyard deployment, it should exist as expected. deployment_name: Union[str, None] = None image: str = "vineyardcloudnative/vineyardd:latest" # Image for vineyard container. # Resource for vineyard sidecar container resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "256Mi") ) @dataclass class InteractiveConfig: # a map from internal port to external port port_mapping: Union[dict, None] = None @dataclass class CoordinatorConfig: endpoint: Union[str, None] = None """The address of existed coordinator service, with formats like 'ip:port'. If address is set, all other coordinator configurations are ignored. """ service_port: int = 63800 # Coordinator service port that will be listening on. http_port: int = 8080 # Coordinator HTTP service port monitor: bool = False # Enable or disable prometheus exporter. monitor_port: int = 9090 # Coordinator prometheus exporter service port. # Kubernetes related config # Name of the coordinator deployment and service. deployment_name: Union[str, None] = None # Node selector for coordinator pod in kubernetes node_selector: Union[str, None] = None # Resource configuration of coordinator. resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.5, "512Mi") ) # For GraphScope operator # Launch coordinator only, do not let coordinator launch resources or delete resources. # It would try to find existing resources and connect to it. operator_mode: bool = False # For http server, limit the max content length of request. Mainly for file upload. max_content_length: str = "1G" # Only start coordinator http server, do not start others services. http_server_only: bool = False @dataclass class HostsLauncherConfig: """Local cluster configuration.""" # list of hostnames of graphscope engine workers. hosts: List[str] = list_field("localhost") # Etcd configuration. Only local session needs to configure etcd. etcd: EtcdConfig = field(default_factory=EtcdConfig) # The number of retries when downloading dataset from internet. dataset_download_retries: int = 3 @dataclass class KubernetesLauncherConfig: """Kubernetes cluster configuration.""" # The namespace to create all resource, which must exist in advance. namespace: Union[str, None] = None delete_namespace: bool = False # Delete the namespace that created by graphscope. config_file: Union[str, None] = None # kube config file path # The deployment mode of engines on the kubernetes cluster, choose from 'eager' or 'lazy'. deployment_mode: str = "eager" # Service type, choose from 'NodePort' or 'LoadBalancer'. service_type: str = "NodePort" # A base64 encoded json string specifies the kubernetes volumes to mount. volumes: Union[str, None] = None # Wait until the graphscope instance has been deleted successfully. waiting_for_delete: bool = False image: ImageConfig = field(default_factory=ImageConfig) # Image configuration. engine: EngineConfig = field(default_factory=EngineConfig) # Engine configuration. # Dataset configuration. dataset: DatasetConfig = field(default_factory=DatasetConfig) mars: MarsConfig = field(default_factory=MarsConfig) # Mars configuration. @dataclass class OperatorLauncherConfig: namespace: str = "default" gae_endpoint: str = "" hosts: List[str] = list_field() @dataclass class SessionConfig: """Session configuration""" num_workers: int = 2 # The number of graphscope engine workers. # The number of graphscope engine workers when launch local workers. default_local_num_workers: int = 1 reconnect: bool = False # Connect to an existed GraphScope Cluster instance_id: Union[str, None] = None # Unique id for each GraphScope instance. # The length of time to wait before giving up launching graphscope.z timeout_seconds: int = 600 # The length of time to wait starting from client disconnected before killing the graphscope instance. dangling_timeout_seconds: int = 600 # The length of time to wait before retrying to launch graphscope. retry_time_seconds: int = 1 execution_mode: str = "eager" # The deploying mode of graphscope, eager or lazy. @dataclass class Config(Serializable): # Solution under the FLEX architecture, choose from 'GraphScope One', 'Interactive' or 'GraphScope Insight' solution: str = "GraphScope One" # Launcher type, choose from 'hosts', 'k8s' or 'operator'. launcher_type: str = "k8s" # Show log or not. show_log: bool = False # Log level, choose from 'info' or 'debug'. log_level: str = "info" session: SessionConfig = field(default_factory=SessionConfig) # Coordinator configuration. coordinator: CoordinatorConfig = field(default_factory=CoordinatorConfig) # Vineyard configuration. vineyard: VineyardConfig = field(default_factory=VineyardConfig) # Interactive configuration. interactive: InteractiveConfig = field(default_factory=InteractiveConfig) # Local cluster configuration. hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig) # Kubernetes cluster configuration. kubernetes_launcher: KubernetesLauncherConfig = field( default_factory=KubernetesLauncherConfig ) # Launcher used in operator mode. operator_launcher: OperatorLauncherConfig = field( default_factory=OperatorLauncherConfig ) def set_option(self, key, value): # noqa: C901 """Forward set_option target to actual config fields""" if key == "addr": self.coordinator.endpoint = value elif key == "mode": self.session.execution_mode = value elif key == "cluster_type": self.launcher_type = value elif key == "k8s_namespace": self.kubernetes_launcher.namespace = value elif key == "k8s_image_registry": self.kubernetes_launcher.image.registry = value elif key == "k8s_image_repository": self.kubernetes_launcher.image.repository = value elif key == "k8s_image_tag": self.kubernetes_launcher.image.tag = value elif key == "k8s_image_pull_policy": self.kubernetes_launcher.image.pull_policy = value elif key == "k8s_image_secrets": self.kubernetes_launcher.image.pull_secrets = value elif key == "k8s_coordinator_cpu": self.coordinator.resource.set_cpu_request(value) elif key == "k8s_coordinator_mem": self.coordinator.resource.set_mem_request(value) elif key == "etcd_addrs": self.hosts_launcher.etcd.endpoint = value elif key == "etcd_listening_client_port": self.hosts_launcher.etcd.listening_client_port = value elif key == "etcd_listening_peer_port": self.hosts_launcher.etcd.listening_peer_port = value elif key == "k8s_vineyard_image": self.vineyard.image = value elif key == "k8s_vineyard_deployment": self.vineyard.deployment_name = value elif key == "k8s_vineyard_cpu": self.vineyard.resource.set_cpu_request(value) elif key == "k8s_vineyard_mem": self.vineyard.resource.set_mem_request(value) elif key == "k8s_engine_cpu": self.kubernetes_launcher.engine.gae_resource.set_cpu_request(value) elif key == "k8s_engine_mem": self.kubernetes_launcher.engine.gae_resource.set_mem_request(value) elif key == "mars_worker_cpu": self.kubernetes_launcher.mars.worker_resource.set_cpu_request(value) elif key == "mars_worker_mem": self.kubernetes_launcher.mars.worker_resource.set_mem_request(value) elif key == "mars_scheduler_cpu": self.kubernetes_launcher.mars.scheduler_resource.set_cpu_request(value) elif key == "mars_scheduler_mem": self.kubernetes_launcher.mars.scheduler_resource.set_mem_request(value) elif key == "k8s_coordinator_pod_node_selector": self.coordinator.node_selector = base64_encode(json.dumps(value)) elif key == "k8s_engine_pod_node_selector": self.kubernetes_launcher.engine.node_selector = base64_encode( json.dumps(value) ) elif key == "enabled_engines": self.kubernetes_launcher.engine.enabled_engines = value elif key == "with_mars": self.kubernetes_launcher.mars.enable = value elif key == "with_dataset": self.kubernetes_launcher.dataset.enable = value elif key == "k8s_volumes": self.kubernetes_launcher.volumes = base64_encode(json.dumps(value)) elif key == "k8s_service_type": self.kubernetes_launcher.service_type = value elif key == "preemptive": self.kubernetes_launcher.engine.preemptive = value elif key == "k8s_deploy_mode": self.kubernetes_launcher.deployment_mode = value elif key == "k8s_waiting_for_delete": self.kubernetes_launcher.waiting_for_delete = value elif key == "num_workers": self.session.num_workers = value self.session.default_local_num_workers = value elif key == "show_log": self.show_log = value elif key == "log_level": self.log_level = value elif key == "timeout_seconds": self.session.timeout_seconds = value elif key == "dangling_timeout_seconds": self.session.dangling_timeout_seconds = value elif key == "dataset_download_retries": self.hosts_launcher.dataset_download_retries = value elif key == "k8s_client_config": self.kubernetes_launcher.config_file = value elif key == "reconnect": self.session.reconnect = value elif key == "vineyard_shared_mem": pass else: raise ValueError("Key not recognized: " + key) def base64_encode(string): return base64.b64encode(string.encode("utf-8")).decode("utf-8", errors="ignore") gs_config = Config() if __name__ == "__main__": config = Config() config.coordinator.resource.requests = None config.kubernetes_launcher.image.registry = "" print(config.dumps_yaml()) # print(config.dumps_json()) s = config.dumps_yaml() config3 = Config.loads_yaml(s) print(config3) parser = ArgumentParser() parser.add_arguments(Config, dest="gs") args = parser.parse_args()