odps/mars_extension/oscar/deploy/gscoordinator.py (73 lines of code) (raw):

# !/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import base64 import sys import json import socket import subprocess from .client import GS_COORDINATOR_NAME DEFAULT_GS_COORDINATOR_PORT = 63800 DEFAULT_GS_COORDINATOR_GATEWAY_PORT = 64800 def start_coordinator(args, coordinator_port, vineyard_socket): subprocess.Popen( [ sys.executable, "-m", "gscoordinator", "--enable_k8s", "false", "--hosts", ",".join(args["worker_pod_ip_list"]), "--port", str(coordinator_port), "--num_workers", str(args["num_workers"]), "--vineyard_socket", vineyard_socket, "--log_level", "DEBUG", "--timeout_seconds", "-1", ], stdout=sys.__stdout__, stderr=sys.__stdout__, bufsize=1, ) def start_coordinator_gateway(args, coordinator_port, coordinator_gateway_port): subprocess.Popen( [ "/usr/local/bin/graphscope-gateway", "-grpc-server-endpoint", "localhost:%s" % coordinator_port, "--gateway-address", ":%s" % coordinator_gateway_port, "-v", "100", "-alsologtostderr", ], stdout=sys.__stdout__, stderr=sys.__stdout__, bufsize=1, ) def _main(): argv = sys.argv[1] args_dict = json.loads(base64.b64decode(argv).decode()) print("launch graphscope:", args_dict) from cupid import context cupid_context = context() host_addr = socket.gethostbyname(socket.gethostname()) os.environ.pop("KUBE_API_ADDRESS") coordinator_port = args_dict.get("port", None) or DEFAULT_GS_COORDINATOR_PORT coordinator_gateway_port = ( args_dict.get("gateway_port", None) or DEFAULT_GS_COORDINATOR_GATEWAY_PORT ) endpoint = "http://{0}:{1}".format(host_addr, coordinator_gateway_port) kvstore = cupid_context.kv_store() kvstore[GS_COORDINATOR_NAME] = json.dumps(dict(endpoint=endpoint)) # start coordinator vineyard_socket = os.environ.get("VINEYARD_IPC_SOCKET", "/tmp/vineyard.sock") start_coordinator(args_dict, coordinator_port, vineyard_socket) start_coordinator_gateway(args_dict, coordinator_port, coordinator_gateway_port) # modify in hyper mode if os.environ.get("VM_ENGINE_TYPE") == "hyper": endpoint = socket.gethostname() + "-{}".format(coordinator_port) cupid_context.register_application(GS_COORDINATOR_NAME, endpoint) if __name__ == "__main__": _main()