python/graphscope/deploy/hosts/cluster.py (94 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import base64 import copy import logging import os import signal import subprocess import sys import graphscope from graphscope.config import Config from graphscope.deploy.launcher import Launcher from graphscope.framework.utils import PipeWatcher from graphscope.framework.utils import get_free_port from graphscope.framework.utils import in_notebook from graphscope.framework.utils import is_free_port try: import gscoordinator COORDINATOR_HOME = os.path.abspath(os.path.join(gscoordinator.__file__, "..", "..")) except ModuleNotFoundError: # If gscoordinator is not installed, try to locate it by relative path, # which is strong related with the directory structure of GraphScope COORDINATOR_HOME = os.path.abspath( os.path.join(__file__, "..", "..", "..", "..", "..", "coordinator") ) logger = logging.getLogger("graphscope") class HostsClusterLauncher(Launcher): """Class for setting up GraphScope instance on hosts cluster""" def __init__(self, config: Config): self._config = copy.deepcopy(config) self._proc = None port = self._config.coordinator.service_port if not is_free_port(port): port = get_free_port() self._config.coordinator.service_port = port self._coordinator_endpoint = f"{self._config.hosts_launcher.hosts[0]}:{port}" def poll(self): if self._proc is not None: return self._proc.poll() return -1 def base64_encode(self, string): return base64.b64encode(string.encode("utf-8")).decode("utf-8", errors="ignore") def _launch_coordinator(self): cmd = [ sys.executable, "-m", "gscoordinator", "--config", self.base64_encode(self._config.dumps_json()), ] # logger.info("Initializing coordinator with command: %s", " ".join(cmd)) env = os.environ.copy() env["PYTHONUNBUFFERED"] = "TRUE" # add graphscope module to PYTHONPATH graphscope_dir = os.path.join(os.path.dirname(graphscope.__file__), "..") coordinator_dir = os.path.join(graphscope_dir, "..", "coordinator") additional_path = graphscope_dir + os.pathsep + coordinator_dir if "PYTHONPATH" in env: env["PYTHONPATH"] = additional_path + os.pathsep + env["PYTHONPATH"] else: env["PYTHONPATH"] = additional_path # Param `start_new_session=True` is for putting child process to a new process group # so it won't get the signals from parent. # In notebook environment, we need to accept the signal from kernel restarted/stopped. process = subprocess.Popen( cmd, start_new_session=False if in_notebook() else True, cwd=os.getcwd(), env=env, encoding="utf-8", errors="replace", stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, bufsize=1, ) stdout_watcher = PipeWatcher(process.stdout, sys.stdout) if not self._config.show_log: stdout_watcher.add_filter( lambda line: "Loading" in line and "it/s]" in line ) setattr(process, "stdout_watcher", stdout_watcher) stderr_watcher = PipeWatcher(process.stderr, sys.stderr) setattr(process, "stderr_watcher", stderr_watcher) self._proc = process def type(self): return "hosts" def start(self): """Launch graphscope instance on hosts cluster. Raises: RuntimeError: If instance launch failed or timeout. Returns: tuple of process and endpoint """ try: self._launch_coordinator() logger.info( "Coordinator service started successful, connecting to service..." ) except Exception as e: self.stop() raise RuntimeError( "Error when launching coordinator on hosts cluster" ) from e def stop(self, wait=False): """Stop GraphScope instance.""" # coordinator's GRPCServer.wait_for_termination works for SIGINT (Ctrl-C) if self._proc is not None: self._proc.send_signal(signal.SIGINT) self._proc.wait(timeout=10) self._proc = None