coordinator/gscoordinator/coordinator.py (141 lines of code) (raw):

#! /usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Coordinator between client and engines""" import argparse import base64 import logging import os import signal import sys import threading from concurrent import futures import connexion import grpc from flask_cors import CORS from graphscope.config import Config from graphscope.proto import coordinator_service_pb2_grpc from gscoordinator.flex.core.client_wrapper import initialize_client_wrapper from gscoordinator.flex.core.config import SOLUTION from gscoordinator.flex.encoder import JSONEncoder from gscoordinator.monitor import Monitor from gscoordinator.utils import human_readable_to_bytes logger = logging.getLogger("graphscope") def config_logging(log_level): """Set log level basic on config. Args: log_level (str): Log level of stdout handler """ logging.basicConfig(level=logging.CRITICAL) # `NOTSET` is special as it doesn't show log in Python if isinstance(log_level, str): log_level = getattr(logging, log_level.upper()) if log_level == logging.NOTSET: log_level = logging.DEBUG logger = logging.getLogger("graphscope") logger.setLevel(log_level) vineyard_logger = logging.getLogger("vineyard") vineyard_logger.setLevel(log_level) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(log_level) stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.WARNING) formatter = logging.Formatter( "%(asctime)s [%(levelname)s][%(module)s:%(lineno)d]: %(message)s" ) stdout_handler.setFormatter(formatter) stderr_handler.setFormatter(formatter) logger.addHandler(stdout_handler) logger.addHandler(stderr_handler) vineyard_logger.addHandler(stdout_handler) vineyard_logger.addHandler(stderr_handler) def launch_graphscope(): args = parse_sys_args() if args.config: config = base64.b64decode(args.config).decode("utf-8", errors="ignore") config = Config.loads_json(config) elif args.config_file: config = Config.load(args.config_file) else: raise RuntimeError("Must specify a config or config-file") config_logging(config.log_level) logger.info("Start server with args \n%s", config.dumps_yaml()) if config.coordinator.http_server_only: start_http_service(config) else: servicer = get_servicer(config) start_server(servicer, config) def parse_sys_args(): parser = argparse.ArgumentParser() parser.add_argument( "--config", type=str, help="The base64 encoded config in json format.", ) parser.add_argument( "--config-file", type=str, help="The config file path in yaml or json format" ) # parser.add_arguments(Config, dest="gs") return parser.parse_args() def get_servicer(config: Config): """Get servicer of specified solution under FLEX architecture""" from gscoordinator.servicer import init_graphscope_one_service_servicer service_initializers = { "GraphScope One": init_graphscope_one_service_servicer, } initializer = service_initializers.get(config.solution) if initializer is None: raise RuntimeError( f"Expect {service_initializers.keys()} of solution parameter" ) return initializer(config) def start_http_service(config): initialize_client_wrapper(config) app = connexion.App(__name__, specification_dir="./flex/openapi/") app.app.json_encoder = JSONEncoder app.app.config["MAX_CONTENT_LENGTH"] = human_readable_to_bytes( config.coordinator.max_content_length ) app.add_api( "openapi.yaml", arguments={"title": "GraphScope FLEX HTTP SERVICE API"}, pythonic_params=True, ) # support cross origin. CORS(app.app) app.run(port=config.coordinator.http_port) def start_server( coordinator_service_servicer, # :coordinator_service_pb2_grpc.CoordinatorServiceServicer, config: Config, ): # register gRPC server from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH server = grpc.server( futures.ThreadPoolExecutor(max(4, os.cpu_count() or 1)), options=[ ("grpc.max_send_message_length", GS_GRPC_MAX_MESSAGE_LENGTH), ("grpc.max_receive_message_length", GS_GRPC_MAX_MESSAGE_LENGTH), ("grpc.max_metadata_size", GS_GRPC_MAX_MESSAGE_LENGTH), ], ) coordinator_service_pb2_grpc.add_CoordinatorServiceServicer_to_server( coordinator_service_servicer, server ) endpoint = f"0.0.0.0:{config.coordinator.service_port}" server.add_insecure_port(endpoint) logger.info("Coordinator server listen at %s", endpoint) server.start() # OpenApi server httpservice_t = threading.Thread(target=start_http_service, args=(config,)) httpservice_t.daemon = True httpservice_t.start() if config.coordinator.monitor: try: Monitor.startServer(config.coordinator.monitor_port, "127.0.0.1") logger.info( "Coordinator monitor server listen at 127.0.0.1:%d", config.coordinator.monitor_port, ) except Exception: # noqa: E722, pylint: disable=broad-except logger.exception( "Failed to start monitor server on '127.0.0.1:%s'", config.coordinator.monitor_port, ) # handle SIGTERM signal def terminate(signum, frame): server.stop(True) coordinator_service_servicer.cleanup() signal.signal(signal.SIGTERM, terminate) try: # GRPC has handled SIGINT server.wait_for_termination() except KeyboardInterrupt: coordinator_service_servicer.cleanup() server.stop(True) if __name__ == "__main__": launch_graphscope()