coordinator/gscoordinator/local_launcher.py (465 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020-2023 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import base64 import json import logging import os import shutil import socket import subprocess import sys import time from typing import List from graphscope.config import Config from graphscope.framework.utils import PipeWatcher from graphscope.framework.utils import get_free_port from graphscope.framework.utils import get_java_version from graphscope.framework.utils import get_tempdir from graphscope.framework.utils import is_free_port from graphscope.proto import message_pb2 from graphscope.proto import types_pb2 from gscoordinator.launcher import AbstractLauncher from gscoordinator.utils import ANALYTICAL_ENGINE_PATH from gscoordinator.utils import GRAPHSCOPE_HOME from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT from gscoordinator.utils import INTERACTIVE_ENGINE_THREADS_PER_WORKER from gscoordinator.utils import WORKSPACE from gscoordinator.utils import ResolveMPICmdPrefix from gscoordinator.utils import get_timestamp from gscoordinator.utils import parse_as_glog_level from gscoordinator.utils import run_command logger = logging.getLogger("graphscope") class LocalLauncher(AbstractLauncher): def __init__(self, config): super().__init__() self._config: Config = config session_config = config.session vineyard_config = config.vineyard launcher_config = config.hosts_launcher # glog level self._glog_level = parse_as_glog_level(config.log_level) # Session Config self._num_workers = session_config.num_workers self._instance_id = session_config.instance_id self._timeout_seconds = session_config.timeout_seconds self._retry_time_seconds = session_config.retry_time_seconds # Vineyard Config self._vineyard_socket = vineyard_config.socket self._vineyard_rpc_port = vineyard_config.rpc_port # Launcher Config self._hosts = launcher_config.hosts self._external_etcd_addr = launcher_config.etcd.endpoint self._etcd_listening_client_port = launcher_config.etcd.listening_client_port self._etcd_listening_peer_port = launcher_config.etcd.listening_peer_port # A graphscope instance may have multiple session by reconnecting to coordinator self._instance_workspace = os.path.join(WORKSPACE, self._instance_id) os.makedirs(self._instance_workspace, exist_ok=True) # setting during client connect to coordinator self._session_workspace = None # etcd self._etcd_process = None self._etcd_endpoint = None # vineyardd self._vineyardd_process = None # analytical engine self._analytical_engine_process = None # interactive engine # executor inter-processing port # executor rpc port # frontend port self._interactive_port = 8233 while not is_free_port(self._interactive_port): self._interactive_port += 10 # learning instance processes self._learning_instance_processes = {} def type(self): return types_pb2.HOSTS def stop(self, is_dangling=False): self.close_analytical_instance() self.close_vineyard() def set_session_workspace(self, session_id: str): self._session_workspace = os.path.join(self._instance_workspace, session_id) os.makedirs(self._session_workspace, exist_ok=True) def get_namespace(self) -> str: return "" @property def hosts(self) -> List[str]: return self._hosts @property def vineyard_socket(self) -> str: return self._vineyard_socket @property def vineyard_endpoint(self) -> str: return f"{self._hosts[0]}:{self._vineyard_rpc_port}" def create_analytical_instance(self): mpi_resolver = ResolveMPICmdPrefix() cmd, mpi_env = mpi_resolver.resolve(self._num_workers, self._hosts) master = self.hosts[0] rpc_port = get_free_port(master) self._analytical_engine_endpoint = f"{master}:{rpc_port}" cmd.append(ANALYTICAL_ENGINE_PATH) cmd.extend(["--host", "0.0.0.0"]) cmd.extend(["--port", str(rpc_port)]) if mpi_resolver.openmpi(): cmd.extend(["-v", str(self._glog_level)]) else: mpi_env["GLOG_v"] = str(self._glog_level) if self.vineyard_socket is not None: cmd.extend(["--vineyard_socket", self.vineyard_socket]) env = os.environ.copy() env.update(mpi_env) env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME logger.info("Launch analytical engine with command: %s", " ".join(cmd)) process = self._popen_helper( cmd, cwd=os.getcwd(), env=env, stderr=subprocess.PIPE ) logger.info("Server is initializing analytical engine.") stdout_watcher = PipeWatcher(process.stdout, sys.stdout) stderr_watcher = PipeWatcher(process.stderr, sys.stderr) setattr(process, "stdout_watcher", stdout_watcher) setattr(process, "stderr_watcher", stderr_watcher) self._analytical_engine_process = process start_time = time.time() while is_free_port(rpc_port): if process.poll() is not None: msg = "Launch analytical engine failed: " msg += "\n".join([line for line in stderr_watcher.poll_all()]) raise RuntimeError(msg) if self._timeout_seconds + start_time < time.time(): self._analytical_engine_process.kill() raise RuntimeError("Launch analytical engine failed due to timeout.") time.sleep(self._retry_time_seconds) logger.info( "Analytical engine is listening on %s", self._analytical_engine_endpoint ) def create_interactive_instance( self, object_id: int, schema_path: str, params: dict, with_cypher: bool ): try: logger.info("Java version: %s", get_java_version()) except: # noqa: E722 logger.exception("Cannot get version of java") env = os.environ.copy() env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME if os.environ.get("PARALLEL_INTERACTIVE_EXECUTOR_ON_VINEYARD", "OFF") != "ON": # only one GIE/GAIA executor will be launched locally, even there are # multiple GAE engines num_workers = 1 threads_per_worker = int( os.environ.get( "THREADS_PER_WORKER", INTERACTIVE_ENGINE_THREADS_PER_WORKER ) ) env["THREADS_PER_WORKER"] = str(threads_per_worker * self._num_workers) else: num_workers = self._num_workers params = "\n".join([f"{k}={v}" for k, v in params.items()]) params = base64.b64encode(params.encode("utf-8")).decode("utf-8") neo4j_disabled = "true" if not with_cypher else "false" cmd = [ INTERACTIVE_ENGINE_SCRIPT, "create_gremlin_instance_on_local", self._session_workspace, str(object_id), schema_path, str(num_workers), # server size str(self._interactive_port), # executor port str(self._interactive_port + 1), # executor rpc port str(self._interactive_port + 2 * num_workers), # frontend gremlin port str(self._interactive_port + 2 * num_workers + 1), # frontend cypher port self.vineyard_socket, neo4j_disabled, params, ] logger.info("Create GIE instance with command: %s", " ".join(cmd)) self._interactive_port += 2 * num_workers + 2 return self._popen_helper(cmd, cwd=os.getcwd(), env=env) @staticmethod def _popen_helper(cmd, cwd, env, stdout=None, stderr=None): # A default value that serves for simple cases, # where the caller are not interested in the output. if stdout is None: stdout = subprocess.PIPE if stderr is None: stderr = subprocess.STDOUT process = subprocess.Popen( cmd, start_new_session=True, cwd=cwd, env=env, encoding="utf-8", errors="replace", stdin=subprocess.DEVNULL, stdout=stdout, stderr=stderr, universal_newlines=True, bufsize=1, ) return process def create_learning_instance(self, object_id, handle, config, learning_backend): if learning_backend == message_pb2.LearningBackend.GRAPHLEARN: return self._create_graphlearn_instance( object_id=object_id, handle=handle, config=config ) elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH: return self._create_graphlearn_torch_instance( object_id=object_id, handle=handle, config=config ) else: raise ValueError("invalid learning backend") def _create_graphlearn_instance(self, object_id, handle, config): # prepare argument handle = json.loads( base64.b64decode(handle.encode("utf-8", errors="ignore")).decode( "utf-8", errors="ignore" ) ) server_list = [f"localhost:{get_free_port()}" for _ in range(self.num_workers)] hosts = ",".join(server_list) handle["server"] = hosts handle = base64.b64encode( json.dumps(handle).encode("utf-8", errors="ignore") ).decode("utf-8", errors="ignore") # launch the server env = os.environ.copy() # set coordinator dir to PYTHONPATH python_path = ( env.get("PYTHONPATH", "") + os.pathsep + os.path.dirname(os.path.dirname(__file__)) ) env["PYTHONPATH"] = python_path self._learning_instance_processes[object_id] = [] for index in range(self._num_workers): cmd = [ sys.executable, "-m", "gscoordinator.launch_graphlearn", handle, config, str(index), ] logger.debug("launching graphlearn server: %s", " ".join(cmd)) proc = self._popen_helper(cmd, cwd=None, env=env) stdout_watcher = PipeWatcher(proc.stdout, sys.stdout) stdout_watcher.suppress(not logger.isEnabledFor(logging.DEBUG)) setattr(proc, "stdout_watcher", stdout_watcher) self._learning_instance_processes[object_id].append(proc) return server_list def _create_graphlearn_torch_instance(self, object_id, handle, config): handle = json.loads( base64.b64decode(handle.encode("utf-8", errors="ignore")).decode( "utf-8", errors="ignore" ) ) server_client_master_port = get_free_port("localhost") handle["server_client_master_port"] = server_client_master_port handle["master_addr"] = "localhost" server_list = [f"localhost:{server_client_master_port}"] # for train, val and test for _ in range(3): server_list.append("localhost:" + str(get_free_port("localhost"))) handle = base64.b64encode( json.dumps(handle).encode("utf-8", errors="ignore") ).decode("utf-8", errors="ignore") # launch the server env = os.environ.copy() # set coordinator dir to PYTHONPATH python_path = ( env.get("PYTHONPATH", "") + os.pathsep + os.path.dirname(os.path.dirname(__file__)) ) env["PYTHONPATH"] = python_path self._learning_instance_processes[object_id] = [] for index in range(self._num_workers): cmd = [ sys.executable, "-m", "gscoordinator.launch_graphlearn_torch", handle, config, str(index), ] # logger.debug("launching graphlearn_torch server: %s", " ".join(cmd)) proc = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8", errors="replace", universal_newlines=True, bufsize=1, ) logger.debug("suppressed: %s", (not logger.isEnabledFor(logging.DEBUG))) stdout_watcher = PipeWatcher( proc.stdout, sys.stdout, suppressed=(not logger.isEnabledFor(logging.DEBUG)), ) time.sleep(5) logger.debug("process status: %s", proc.poll()) setattr(proc, "stdout_watcher", stdout_watcher) self._learning_instance_processes[object_id].append(proc) return server_list def close_analytical_instance(self): self._stop_subprocess(self._analytical_engine_process, kill=True) self._analytical_engine_endpoint = None def close_interactive_instance(self, object_id): env = os.environ.copy() env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME cmd = [ INTERACTIVE_ENGINE_SCRIPT, "close_gremlin_instance_on_local", self._session_workspace, str(object_id), ] logger.info("Close GIE instance with command: %s", " ".join(cmd)) process = self._popen_helper(cmd, cwd=os.getcwd(), env=env) # 60 seconds is enough process.wait(timeout=self._timeout_seconds) return process def close_learning_instance(self, object_id, learning_backend=0): if object_id not in self._learning_instance_processes: return # terminate the process for proc in self._learning_instance_processes[object_id]: self._stop_subprocess(proc, kill=True) self._learning_instance_processes.clear() def launch_etcd(self): if not is_free_port(self._etcd_listening_client_port): self._etcd_listening_client_port = get_free_port() if not is_free_port(self._etcd_listening_peer_port): self._etcd_listening_peer_port = get_free_port() local_hostname = "127.0.0.1" if len(self._hosts) > 1: try: local_hostname = socket.gethostname() # make sure the hostname is dns-resolvable socket.gethostbyname(local_hostname) except: # noqa: E722 local_hostname = "127.0.0.1" # fallback to a must-correct hostname self._etcd_endpoint = ( f"http://{local_hostname}:{self._etcd_listening_client_port}" ) env = os.environ.copy() env.update({"ETCD_MAX_TXN_OPS": "102400"}) etcd_exec = self.find_etcd() cmd = etcd_exec + [ "--data-dir", str(self._instance_workspace), "--listen-peer-urls", f"http://0.0.0.0:{self._etcd_listening_peer_port}", "--listen-client-urls", f"http://0.0.0.0:{self._etcd_listening_client_port}", "--advertise-client-urls", self._etcd_endpoint, "--initial-cluster", f"default=http://127.0.0.1:{self._etcd_listening_peer_port}", "--initial-advertise-peer-urls", f"http://127.0.0.1:{self._etcd_listening_peer_port}", ] logger.info("Launch etcd with command: %s", " ".join(cmd)) logger.info("Server is initializing etcd.") process = self._popen_helper(cmd, cwd=os.getcwd(), env=env) stdout_watcher = PipeWatcher( process.stdout, sys.stdout, drop=False, suppressed=False, ) setattr(process, "stdout_watcher", stdout_watcher) self._etcd_process = process start_time = time.time() while is_free_port(self._etcd_listening_client_port): if self._timeout_seconds + start_time < time.time(): self._etcd_process.kill() outs, _ = self._etcd_process.communicate() logger.error("Start etcd timeout, %s", outs) msg = "Launch etcd service failed due to timeout: " msg += "\n".join([line for line in stdout_watcher.poll_all()]) raise RuntimeError(msg) time.sleep(self._retry_time_seconds) stdout_watcher.drop(True) stdout_watcher.suppress(not logger.isEnabledFor(logging.DEBUG)) logger.info("Etcd is ready, endpoint is %s", self._etcd_endpoint) def launch_vineyard(self): if self.vineyard_socket is not None: logger.info("Found existing vineyard socket: %s", self.vineyard_socket) return ts = get_timestamp() self._vineyard_socket = os.path.join(get_tempdir(), f"vineyard.sock.{ts}") if not is_free_port(self._vineyard_rpc_port): logger.warning( "Vineyard rpc port %d is occupied, try to use another one.", self._vineyard_rpc_port, ) self._vineyard_rpc_port = get_free_port() hosts = [f"{host.split(':')[0]}:1" for host in self._hosts] if len(hosts) > 1: # Use MPI to start multiple process mpi_resolver = ResolveMPICmdPrefix() cmd, mpi_env = mpi_resolver.resolve(len(hosts), hosts) else: # Start single process without MPI cmd, mpi_env = [], {} cmd.extend([sys.executable, "-m", "vineyard"]) cmd.extend(["--socket", self.vineyard_socket]) cmd.extend(["--rpc_socket_port", str(self._vineyard_rpc_port)]) if len(hosts) > 1: # Launch etcd if not exists self.configure_etcd_endpoint() cmd.extend(["-etcd_endpoint", self._etcd_endpoint]) cmd.extend(["-etcd_prefix", f"vineyard.gsa.{ts}"]) else: cmd.extend(["--meta", "local"]) env = os.environ.copy() env["GLOG_v"] = str(self._glog_level) env.update(mpi_env) logger.info("Launch vineyardd with command: %s", " ".join(cmd)) logger.info("Server is initializing vineyardd.") process = self._popen_helper(cmd, cwd=os.getcwd(), env=env) stdout_watcher = PipeWatcher(process.stdout, sys.stdout, drop=False) setattr(process, "stdout_watcher", stdout_watcher) self._vineyardd_process = process start_time = time.time() if len(hosts) > 1: time.sleep(5 * self._retry_time_seconds) # should be OK else: while not os.path.exists(self._vineyard_socket): if self._vineyardd_process.poll() is not None: msg = "Launch vineyardd failed: " msg += "\n".join([line for line in stdout_watcher.poll_all()]) msg += "\nRerun with `graphscope.set_option(log_level='debug')`," msg += " to get verbose vineyardd logs." raise RuntimeError(msg) if self._timeout_seconds + start_time < time.time(): self._vineyardd_process.kill() # outs, _ = self._vineyardd_process.communicate() # logger.error("Start vineyardd timeout, %s", outs) raise RuntimeError("Launch vineyardd failed due to timeout.") time.sleep(self._retry_time_seconds) stdout_watcher.drop(True) stdout_watcher.suppress(not logger.isEnabledFor(logging.DEBUG)) logger.info( "Vineyardd is ready, ipc socket is %s, rpc port is %s", self._vineyard_socket, self._vineyard_rpc_port, ) def close_etcd(self): self._stop_subprocess(self._etcd_process) def close_vineyard(self): self._stop_subprocess(self._vineyardd_process, kill=True) self.close_etcd() @staticmethod def _stop_subprocess(proc, kill=False) -> None: if proc is not None: if kill: proc.kill() else: proc.terminate() def distribute_file(self, path) -> None: dir = os.path.dirname(path) for host in self.hosts: if host not in ("localhost", "127.0.0.1"): logger.debug(run_command(f"ssh {host} mkdir -p {dir}")) # noqa: G004 logger.debug(run_command(f"scp -r {path} {host}:{path}")) # noqa: G004 @staticmethod def find_etcd() -> List[str]: etcd = shutil.which("etcd") if etcd is None: etcd = [sys.executable, "-m", "etcd_distro.etcd"] else: etcd = [etcd] return etcd def configure_etcd_endpoint(self): if self._external_etcd_addr is None: self.launch_etcd() logger.info("etcd cluster created") else: self._etcd_endpoint = f"http://{self._external_etcd_addr}" logger.info("Using external etcd cluster") logger.info("etcd endpoint is %s", self._etcd_endpoint) def start(self): try: # create vineyard self.launch_vineyard() except Exception: # pylint: disable=broad-except time.sleep(1) logger.exception("Error when launching GraphScope on local") self.stop() return False return True def get_engine_config(self) -> dict: config = { "engine_hosts": ",".join(self._hosts), "mars_endpoint": None, } return config def get_vineyard_stream_info(self): return "ssh", self.hosts