maga_transformer/start_server.py (121 lines of code) (raw):

import os import sys import json import time import logging import logging.config import traceback import requests import signal import multiprocessing from typing import Generator, Union, Any, Dict, List CUR_PATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(str(CUR_PATH), '..')) from maga_transformer.config.log_config import LOGGING_CONFIG from maga_transformer.distribute.worker_info import WorkerInfo, DEFAULT_START_PORT from maga_transformer.start_backend_server import start_backend_server from maga_transformer.start_frontend_server import start_frontend_server from maga_transformer.utils.concurrency_controller import ConcurrencyController, init_controller def check_server_health(server_port): try: response = requests.get(f'http://localhost:{server_port}/health', timeout=5) logging.info(f"response status_code = {response.status_code}, text = {response.text}, len = {len(response.text)}") if response.status_code == 200 and response.text.strip() == '"ok"': return True else: logging.info(f"health check is not ready") return False except BaseException as e: logging.info(f"health check is not ready, {str(e)}") return False def start_backend_server_impl(global_controller): # only for debug if os.environ.get('DEBUG_LOAD_SERVER', None) == '1': start_backend_server(global_controller) os._exit(-1) backend_process = multiprocessing.Process(target=start_backend_server, args=(global_controller, ), name="backend_server") backend_process.start() retry_interval_seconds = 5 start_port = int(os.environ.get('START_PORT', DEFAULT_START_PORT)) backend_server_port = WorkerInfo.backend_server_port_offset(0, start_port) while True: if not backend_process.is_alive(): monitor_and_release_process(backend_process, None) raise Exception("backend server is not alive") try: if check_server_health(backend_server_port): logging.info(f'backend server is ready') break else: time.sleep(retry_interval_seconds) except Exception as e: logging.info(f'backend server is not ready') time.sleep(retry_interval_seconds) return backend_process def start_frontend_server_impl(global_controller, backend_process): frontend_server_count = int(os.environ.get('FRONTEND_SERVER_COUNT', 4)) if frontend_server_count < 1: logging.info("frontend server's count is {frontend_server_count}, this may be a mistake") frontend_processes = [] for i in range(frontend_server_count) : os.environ['FRONTEND_SERVER_ID'] = str(i) process = multiprocessing.Process(target=start_frontend_server, args=(i, global_controller), name=f"frontend_server_{i}") frontend_processes.append(process) process.start() retry_interval_seconds = 5 start_port = int(os.environ.get('START_PORT', DEFAULT_START_PORT)) while True: if not all(proc.is_alive() for proc in frontend_processes): monitor_and_release_process(backend_process, frontend_processes) raise Exception("frontend server is not alive") try: check_server_health(start_port) logging.info(f'frontend server is ready') break except Exception as e: # 如果连接失败,等待一段时间后重试 time.sleep(retry_interval_seconds) return frontend_processes def main(): try: multiprocessing.set_start_method('spawn') except RuntimeError as e: logging.warn(str(e)) pass global_controller = init_controller() backend_process = None frontend_process = None try: logging.info("start backend server") backend_process = start_backend_server_impl(global_controller) logging.info(f"backend server process = {backend_process}") logging.info("start frontend server") frontend_process = start_frontend_server_impl(global_controller, backend_process) logging.info(f"frontend server process = {frontend_process}") finally: monitor_and_release_process(backend_process, frontend_process) def monitor_and_release_process(backend_process, frontend_process): all_process = [] if backend_process: all_process.append(backend_process) if frontend_process: all_process.extend(frontend_process) logging.info(f"all process = {all_process}") while any(proc.is_alive() for proc in all_process): if not all(proc.is_alive() for proc in all_process): logging.error(f'server monitor : some process is not alive, exit!') if backend_process: try: os.killpg(os.getpgid(backend_process.pid), signal.SIGTERM) except Exception as e: logging.error(f"catch exception when kill backend process : {str(e)}") for proc in all_process: try: proc.terminate() except Exception as e: logging.error(f"catch exception when process terminate : {str(e)}") time.sleep(1) [proc.join() for proc in all_process] logging.info("all process exit") if __name__ == '__main__': os.makedirs('logs', exist_ok=True) main()