odps/mars_extension/legacy/deploy/app.py (350 lines of code) (raw):
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import base64
import logging
import json
import os
import sys
from mars.deploy.kubernetes.client import KubernetesCluster, new_cluster
from mars.deploy.kubernetes.config import (
VolumeConfig,
MarsReplicationControllerConfig,
MarsSchedulersConfig,
MarsWorkersConfig,
MarsWebsConfig,
)
from mars.deploy.utils import wait_services_ready
from mars.utils import readable_size
from ...utils import build_mars_image_name
logger = logging.getLogger(__name__)
UI_PORT = 50002
NOTEBOOK_PORT = 50003
GS_COORDINATOR_PORT = 63800
GS_COORDINATOR_GATEWAY_PORT = 64800
class DiskDriverVolumeConfig(VolumeConfig):
def __init__(self, name, mount_path, device_size, renew_interval=3, timeout=120):
super().__init__(name, mount_path)
self._device_size = device_size
self._timeout = timeout
self._renew_interval = renew_interval
def build(self):
return {
"name": self.name,
"diskDriverEphemeralDevice": {
"deviceSize": self._device_size,
"timeout": self._timeout,
"renewInterval": self._renew_interval,
},
}
class CupidMarsConfigMixin:
def __init__(self, *args, **kwargs):
self._container_port = kwargs.pop("container_port", None)
super().__init__(*args, **kwargs)
@property
def stat_type(self):
return "cgroup" if os.environ.get("VM_ENGINE_TYPE") != "hyper" else None
@staticmethod
def get_local_app_module(mod_name):
from . import scheduler
return scheduler.__name__.rsplit(".", 1)[0] + "." + mod_name
def build_container(self):
container_config = super().build_container()
container_config.update({"enableChannel": True, "enableNativeLib": True})
if self._container_port is not None:
container_config["ports"] = {
"containerPort": int(self._container_port),
}
return container_config
def build_template_spec(self):
spec_config = super().build_template_spec()
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
spec_config.update({"restartPolicy": "Never"})
else:
spec_config.update({"hostNetwork": True, "restartPolicy": "Never"})
return spec_config
def config_readiness_probe(self):
return None
def build(self):
result = super().build()
with open("pod_%s.json" % self.rc_name, "w") as pod_file:
pod_file.write(json.dumps(result, indent=2, sort_keys=True))
return result
def remove_env(self, name):
try:
super().remove_env(name)
except AttributeError:
self._envs.pop(name, None)
class CupidMarsSchedulersConfig(CupidMarsConfigMixin, MarsSchedulersConfig):
def add_default_envs(self):
super().add_default_envs()
self.remove_env("MARS_USE_CGROUP_STAT")
self.add_env("MARS_DISABLE_FAILOVER", "1")
self.add_env("MARS_POD_ROLE", "scheduler")
if self.stat_type == "cgroup":
self.add_env("MARS_MEM_USE_CGROUP_STAT", "1")
self.add_env("MARS_CPU_USE_PROCESS_STAT", "1")
class CupidMarsWorkersConfig(CupidMarsConfigMixin, MarsWorkersConfig):
def __init__(self, *args, **kwargs):
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
kwargs["mount_shm"] = kwargs.get("mount_shm", None) or False
kwargs["min_cache_mem"] = kwargs.get("min_cache_mem", None) or "10%"
super().__init__(*args, **kwargs)
def add_default_envs(self):
super().add_default_envs()
self.add_env("MARS_LOCK_FREE_FILEIO", "1")
self.add_env("MARS_DISABLE_PROC_RECOVER", "1")
self.add_env("MARS_PLASMA_LIMIT", "95%")
self.add_env("MARS_POD_ROLE", "worker")
self.remove_env("MARS_USE_CGROUP_STAT")
if self.stat_type == "cgroup":
self.add_env("MARS_CPU_USE_PROCESS_STAT", "1")
self.add_env("MARS_MEM_USE_CGROUP_STAT", "1")
class CupidMarsWebsConfig(CupidMarsConfigMixin, MarsWebsConfig):
def __init__(self, *args, **kwargs):
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
default_port = UI_PORT
else:
default_port = None
kwargs["service_port"] = kwargs.get("service_port", None) or default_port
super().__init__(*args, **kwargs)
def add_default_envs(self):
super().add_default_envs()
self.add_env("MARS_POD_ROLE", "web")
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
self.add_env("MARS_UI_PORT", str(UI_PORT))
class CupidMarsNotebooksConfig(CupidMarsConfigMixin, MarsReplicationControllerConfig):
rc_name = "marsnotebook"
def __init__(self, *args, **kwargs):
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
default_port = NOTEBOOK_PORT
else:
default_port = None
kwargs["service_port"] = kwargs.get("service_port", None) or default_port
super().__init__(*args, **kwargs)
def build_container_command(self):
return [
"/srv/entrypoint.sh",
self.get_local_app_module("notebook"),
]
def add_default_envs(self):
super().add_default_envs()
self.add_env("MARS_POD_ROLE", "notebook")
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
self.add_env("MARS_NOTEBOOK_PORT", self._container_port)
class CupidGSCoordinatorConfig(CupidMarsConfigMixin, MarsReplicationControllerConfig):
rc_name = "gscoordinator"
def __init__(self, *args, **kwargs):
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
default_port = GS_COORDINATOR_GATEWAY_PORT
else:
default_port = None
kwargs["service_port"] = kwargs.get("service_port", None) or default_port
self._worker_pod_name_list = kwargs.pop("worker_pod_name_list", None)
self._worker_pod_ip_list = kwargs.pop("worker_pod_ip_list", None)
self._port = GS_COORDINATOR_PORT
self._gateway_port = kwargs["service_port"]
self._worker_num = kwargs.pop("worker_num", 1)
super().__init__(*args, **kwargs)
def build_container_command(self):
# replace command with coordinator
coordinator_args = dict(
port=self._port,
gateway_port=self._gateway_port,
worker_pod_name_list=self._worker_pod_name_list,
worker_pod_ip_list=self._worker_pod_ip_list,
num_workers=self._worker_num,
)
cmd = [
"/srv/entrypoint.sh",
self.get_local_app_module("gscoordinator"),
base64.b64encode(json.dumps(coordinator_args).encode()).decode(),
]
return cmd
def add_default_envs(self):
super().add_default_envs()
self.add_env("MARS_POD_ROLE", "gscoordinator")
if os.environ.get("VM_ENGINE_TYPE") == "hyper":
self.add_env("GS_COORDINATOR_GATEWAY_PORT", self._container_port)
self.add_env("GS_COORDINATOR_PORT", self._container_port)
class CupidKubernetesCluster(KubernetesCluster):
_scheduler_config_cls = CupidMarsSchedulersConfig
_worker_config_cls = CupidMarsWorkersConfig
_web_config_cls = CupidMarsWebsConfig
_default_service_port = None
def __init__(self, *args, **kwargs):
self._with_notebook = kwargs.pop("with_notebook", False)
self._notebook_cpu = kwargs.pop("notebook_cpu", None)
self._notebook_mem = kwargs.pop("notebook_mem", None)
self._notebook_extra_env = kwargs.get("extra_env", None) or dict()
self._notebook_extra_env.update(
kwargs.pop("notebook_extra_env", None) or dict()
)
self._with_graphscope = kwargs.pop("with_graphscope", False)
self._coordinator_extra_env = kwargs.get("extra_env", None) or dict()
self._coordinator_extra_env.update(
kwargs.pop("coordinator_extra_env", None) or dict()
)
self._coordinator_cpu = kwargs.pop("coordinator_cpu", None)
self._coordinator_mem = kwargs.pop("coordinator_mem", None)
self._node_blacklist = set(kwargs.pop("node_blacklist", None) or [])
self._blacklisted_pods = set()
kwargs["image"] = build_mars_image_name(kwargs.pop("image", None))
super().__init__(*args, **kwargs)
def _create_notebook(self):
if self._with_notebook:
notebook_config = CupidMarsNotebooksConfig(
1,
image=self._image,
cpu=self._notebook_cpu,
memory=self._notebook_mem,
volumes=self._extra_volumes,
pre_stop_command=self._pre_stop_command,
)
notebook_config.add_simple_envs(self._notebook_extra_env)
self._core_api.create_namespaced_replication_controller(
self._namespace, notebook_config.build()
)
def _create_graphscope(self):
if self._with_graphscope:
# check the worker services are ready
limits = [self._worker_num]
selectors = ["mars/service-type=" + CupidMarsWorkersConfig.rc_name]
wait_services_ready(
selectors,
limits,
lambda sel: self._get_ready_pod_count(sel),
timeout=self._timeout,
)
# get the worker pod name and ip
worker_pod_name_list, worker_pod_ip_list = self._get_pods_name_and_ip(
selectors[0]
)
coordinator_config = CupidGSCoordinatorConfig(
1,
image=self._image,
cpu=self._coordinator_cpu,
memory=self._coordinator_mem,
volumes=self._extra_volumes,
pre_stop_command=self._pre_stop_command,
worker_pod_name_list=worker_pod_name_list,
worker_pod_ip_list=worker_pod_ip_list,
worker_num=self._worker_num,
)
coordinator_config.add_simple_envs(self._coordinator_extra_env)
self._core_api.create_namespaced_replication_controller(
self._namespace, coordinator_config.build()
)
def _create_services(self):
super()._create_services()
self._create_graphscope()
self._create_notebook()
def _create_kube_service(self):
# does not create k8s service as not supported in cupid
pass
def _get_ready_pod_count(self, label_selector):
if self._node_blacklist:
query = self._core_api.list_namespaced_pod(
namespace=self._namespace, label_selector=label_selector
).to_dict()
for el in query["items"]:
node_name = el.get("spec", {}).get("node_name")
pod_name = el["metadata"]["name"]
if pod_name in self._blacklisted_pods:
continue
if node_name and node_name in self._node_blacklist:
logger.warning(
"Found node %s in blacklist, will terminate pod %s",
node_name,
pod_name,
)
try:
self._core_api.delete_namespaced_pod(
name=pod_name, namespace=self._namespace
)
self._blacklisted_pods.add(pod_name)
except: # noqa: E722
pass
return super()._get_ready_pod_count(label_selector)
def _get_pods_name_and_ip(self, label_selector):
pod_name_list = []
pod_ip_list = []
query = self._core_api.list_namespaced_pod(
namespace=self._namespace, label_selector=label_selector
).to_dict()
for el in query["items"]:
if el["status"]["reason"] == "CupidStarted":
pod_name_list.append(el["metadata"]["name"])
pod_ip_list.append(el["status"]["pod_ip"])
return pod_name_list, pod_ip_list
class MarsCupidServer(object):
def __init__(self):
self.args = None
self._instance_id = None
self._kube_url = None
self._kube_client = None
def __call__(self, argv=None):
if argv is None:
argv = sys.argv[1:]
return self._main(argv)
def config_logging(self):
import logging.config
import mars
log_conf = self.args.log_conf or "logging.conf"
conf_file_paths = [
"",
os.path.abspath("."),
os.path.dirname(os.path.dirname(mars.__file__)),
]
log_configured = False
for path in conf_file_paths:
conf_path = os.path.join(path, log_conf) if path else log_conf
if os.path.exists(conf_path):
logging.config.fileConfig(conf_path, disable_existing_loggers=False)
log_configured = True
break
if not log_configured:
log_level = self.args.log_level
level = getattr(logging, log_level.upper()) if log_level else logging.INFO
logging.getLogger("mars").setLevel(level)
logging.basicConfig(format=self.args.log_format)
def _main(self, argv=None):
parser = argparse.ArgumentParser(description="Mars Cupid Application")
parser.add_argument("--log-level", help="log level")
parser.add_argument("--log-format", help="log format")
parser.add_argument(
"--log-conf", help="log config file, logging.conf by default"
)
parser.add_argument("encoded_args")
self.args = parser.parse_args(argv)
self.config_logging()
self._kube_url = os.environ["KUBE_API_ADDRESS"].strip('"')
self._instance_id = os.environ["KUBE_NAMESPACE"].strip('"')
args_dict = json.loads(base64.b64decode(self.args.encoded_args).decode())
args_dict["namespace"] = self._instance_id
if args_dict.get("worker_disk_num") is not None:
disk_num = args_dict.pop("worker_disk_num")
disk_size = args_dict.pop("worker_disk_size")
args_dict["worker_spill_paths"] = [
DiskDriverVolumeConfig(
name="diskdriver-volume%d" % i,
mount_path="/diskdriver%d" % i,
device_size=readable_size(disk_size, trunc=True).lower(),
)
for i in range(disk_num)
]
scheduler_extra_env = args_dict.get("scheduler_extra_env") or dict()
if args_dict.get("instance_idle_timeout") is not None:
idle_timeout = args_dict.pop("instance_idle_timeout")
scheduler_extra_env["MARS_INSTANCE_IDLE_TIMEOUT"] = str(idle_timeout)
args_dict["scheduler_extra_env"] = scheduler_extra_env
extra_env = args_dict.get("extra_env") or dict()
extra_env["KUBE_API_ADDRESS"] = self._kube_url
extra_env["KUBE_NAMESPACE"] = self._instance_id
args_dict["extra_env"] = extra_env
new_cluster(
self.get_instance_kube(), cluster_cls=CupidKubernetesCluster, **args_dict
)
logger.info("Cluster creation finished")
def get_instance_kube(self):
from kubernetes import client
config = client.Configuration()
config.host = self._kube_url
return client.ApiClient(config)
main = MarsCupidServer()
if __name__ == "__main__":
main()