coordinator/gscoordinator/launcher.py (136 lines of code) (raw):
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import platform
from abc import ABCMeta
from abc import abstractmethod
from typing import List
from gscoordinator.utils import GRAPHSCOPE_HOME
logger = logging.getLogger("graphscope")
def configure_environ():
# add `${GRAPHSCOPE_HOME}/bin` to ${PATH}
os.environ["PATH"] += os.pathsep + os.path.join(GRAPHSCOPE_HOME, "bin")
# OPAL_PREFIX for openmpi
opal_prefix = None
if os.path.isdir(os.path.join(GRAPHSCOPE_HOME, "openmpi")):
opal_prefix = os.path.join(GRAPHSCOPE_HOME, "openmpi")
if os.path.isdir(os.path.join("/opt", "openmpi")):
opal_prefix = os.path.join("/opt", "openmpi")
# Darwin is open-mpi
if os.path.isdir(os.path.join(GRAPHSCOPE_HOME, "open-mpi")):
opal_prefix = os.path.join(GRAPHSCOPE_HOME, "open-mpi")
if opal_prefix is None:
logger.info(
"Failed to resolve the openmpi path, moving towards the system-wide one"
)
else:
os.environ["OPAL_PREFIX"] = opal_prefix
if platform.system() == "Darwin":
# requires on macOS, but break Kubernetes tests on Linux
os.environ["OPAL_BINDIR"] = os.path.join(opal_prefix, "bin")
os.environ["OPAL_LIBDIR"] = os.path.join(opal_prefix, "lib")
os.environ["OPAL_DATADIR"] = os.path.join(opal_prefix, "share")
os.environ["OMPI_MCA_mca_base_component_path"] = os.path.join(
opal_prefix, "lib", "openmpi"
)
# add '${GRAPHSCOPE_HOME}/lib' to ${LD_LIBRARY_PATH} to find libvineyard_internal_registry.so(dylib)
if "LD_LIBRARY_PATH" in os.environ:
os.environ["LD_LIBRARY_PATH"] = (
os.path.join(GRAPHSCOPE_HOME, "lib")
+ os.pathsep
+ os.environ["LD_LIBRARY_PATH"]
)
else:
os.environ["LD_LIBRARY_PATH"] = os.path.join(GRAPHSCOPE_HOME, "lib")
if "DYLD_LIBRARY_PATH" in os.environ:
os.environ["DYLD_LIBRARY_PATH"] = (
os.path.join(GRAPHSCOPE_HOME, "lib")
+ os.pathsep
+ os.environ["DYLD_LIBRARY_PATH"]
)
else:
os.environ["DYLD_LIBRARY_PATH"] = os.path.join(GRAPHSCOPE_HOME, "lib")
class AbstractLauncher(metaclass=ABCMeta):
def __init__(self):
self._instance_id: str = None
self._num_workers: int = None
self._hosts: List[str] = []
self._analytical_engine_endpoint: str = None
self._vineyard_endpoint: str = None
self._vineyard_socket: str = None
self._session_workspace: str = None
configure_environ()
@abstractmethod
def create_analytical_instance(self):
pass
@abstractmethod
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict, with_cypher: bool
):
pass
@abstractmethod
def create_learning_instance(
self, object_id: int, handle: str, config: str, learning_backend: int
):
pass
@abstractmethod
def close_analytical_instance(self):
pass
@abstractmethod
def close_interactive_instance(self, object_id: int):
pass
@abstractmethod
def close_learning_instance(self, object_id: int, learning_backend: int):
pass
@abstractmethod
def launch_etcd(self):
pass
@abstractmethod
def launch_vineyard(self):
pass
@abstractmethod
def close_etcd(self):
pass
@abstractmethod
def close_vineyard(self):
pass
@abstractmethod
def configure_etcd_endpoint(self):
pass
@abstractmethod
def get_engine_config(self) -> dict:
pass
@abstractmethod
def get_vineyard_stream_info(self):
pass
@abstractmethod
def distribute_file(self, path):
pass
@property
def vineyard_endpoint(self) -> str:
return self._vineyard_endpoint
@property
def vineyard_socket(self) -> str:
return self._vineyard_socket
@property
def analytical_engine_endpoint(self) -> str:
return self._analytical_engine_endpoint
@property
def num_workers(self) -> int:
if self._num_workers is None:
raise RuntimeError("Get None value of workers number.")
return int(self._num_workers)
@property
def instance_id(self) -> str:
return self._instance_id
@property
def hosts(self) -> List[str]:
return self._hosts
@abstractmethod
def type(self):
pass
@abstractmethod
def start(self):
pass
@abstractmethod
def stop(self, is_dangling=False):
pass
@abstractmethod
def set_session_workspace(self, session_id: str):
pass
def get_namespace(self) -> str:
pass