coordinator/gscoordinator/utils.py (1,695 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import datetime
import functools
import glob
import hashlib
import inspect
import json
import logging
import os
import shlex
import shutil
import subprocess
import sys
import time
import traceback
import uuid
import zipfile
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from queue import Queue
from string import Template
from typing import List
import grpc
import yaml
from google.protobuf.any_pb2 import Any
from graphscope.framework import utils
from graphscope.framework.errors import CompilationError
from graphscope.framework.graph_schema import GraphSchema
from graphscope.framework.utils import PipeWatcher
from graphscope.framework.utils import find_java_exe
from graphscope.framework.utils import get_platform_info
from graphscope.framework.utils import get_tempdir
from graphscope.proto import attr_value_pb2
from graphscope.proto import data_types_pb2
from graphscope.proto import graph_def_pb2
from graphscope.proto import op_def_pb2
from graphscope.proto import types_pb2
from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME
from gscoordinator.version import __version__
logger = logging.getLogger("graphscope")
RESOURCE_DIR_NAME = "resource"
# runtime workspace
try:
WORKSPACE = os.environ["GRAPHSCOPE_RUNTIME"]
except KeyError:
WORKSPACE = os.path.join(get_tempdir(), "gs")
# make sure we have permission to create instance workspace
try:
os.makedirs(os.path.join(WORKSPACE, ".ignore"), exist_ok=True)
except: # noqa: E722, pylint: disable=bare-except
WORKSPACE = os.path.expanduser("~/.graphscope/gs")
# COORDINATOR_HOME
# 1) get from gscoordinator python module, if failed,
# 2) infer from current directory
try:
import gscoordinator
COORDINATOR_HOME = os.path.abspath(os.path.join(gscoordinator.__file__, "..", ".."))
except ModuleNotFoundError:
COORDINATOR_HOME = os.path.abspath(os.path.join(__file__, "..", ".."))
# GRAPHSCOPE_HOME
# 1) get from environment variable `GRAPHSCOPE_HOME`, if not exist,
# 2) infer from COORDINATOR_HOME
GRAPHSCOPE_HOME = os.environ.get("GRAPHSCOPE_HOME", None)
if GRAPHSCOPE_HOME is None:
# Note: The order of locations matters
possible_locations = [
os.path.join(COORDINATOR_HOME, "graphscope.runtime"), # installed by pip
"/opt/graphscope", # installed by gs script
"/usr/local", # a popular location
]
for location in possible_locations:
ANALYTICAL_ENGINE_PATH = os.path.join(location, "bin", "grape_engine")
if os.path.isfile(ANALYTICAL_ENGINE_PATH):
GRAPHSCOPE_HOME = location
break
if GRAPHSCOPE_HOME is not None:
ANALYTICAL_ENGINE_HOME = GRAPHSCOPE_HOME
ANALYTICAL_ENGINE_PATH = os.path.join(GRAPHSCOPE_HOME, "bin", "grape_engine")
INTERACTIVE_ENGINE_SCRIPT = os.path.join(GRAPHSCOPE_HOME, "bin", "giectl")
else:
# resolve from develop source tree
# Here the GRAPHSCOPE_HOME has been set to the root of the source tree,
# So the engine location doesn't need to check again,
# just rely on GRAPHSCOPE_HOME.
GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "..")
ANALYTICAL_ENGINE_HOME = os.path.join(GRAPHSCOPE_HOME, "analytical_engine")
ANALYTICAL_ENGINE_PATH = os.path.join(
ANALYTICAL_ENGINE_HOME, "build", "grape_engine"
)
INTERACTIVE_ENGINE_SCRIPT = os.path.join(
GRAPHSCOPE_HOME,
"interactive_engine",
"assembly",
"src",
"bin",
"graphscope",
"giectl",
)
# template directory for code generation
TEMPLATE_DIR = os.path.join(COORDINATOR_HOME, "gscoordinator", "template")
# builtin app resource
BUILTIN_APP_RESOURCE_PATH = os.path.join(
COORDINATOR_HOME, "gscoordinator", "builtin", "app", "builtin_app.gar"
)
# default config file in gar resource
DEFAULT_GS_CONFIG_FILE = ".gs_conf.yaml"
ANALYTICAL_BUILTIN_SPACE = os.path.join(GRAPHSCOPE_HOME, "precompiled", "builtin")
# ANALYTICAL_ENGINE_JAVA_HOME
ANALYTICAL_ENGINE_JAVA_HOME = ANALYTICAL_ENGINE_HOME
ANALYTICAL_ENGINE_JAVA_RUNTIME_JAR = os.path.join(
ANALYTICAL_ENGINE_JAVA_HOME,
"lib",
f"grape-runtime-{__version__}-shaded.jar",
)
ANALYTICAL_ENGINE_JAVA_GIRAPH_JAR = os.path.join(
ANALYTICAL_ENGINE_JAVA_HOME,
"lib",
f"grape-giraph-{__version__}-shaded.jar",
)
ANALYTICAL_ENGINE_JAVA_INIT_CLASS_PATH = (
f"{ANALYTICAL_ENGINE_JAVA_RUNTIME_JAR}:{ANALYTICAL_ENGINE_JAVA_GIRAPH_JAR}"
)
ANALYTICAL_ENGINE_JAVA_JVM_OPTS = f"-Djava.library.path={GRAPHSCOPE_HOME}/lib"
ANALYTICAL_ENGINE_JAVA_JVM_OPTS += (
f" -Djava.class.path={ANALYTICAL_ENGINE_JAVA_INIT_CLASS_PATH}"
)
# JAVA SDK related CONSTANTS
LLVM4JNI_HOME = os.environ.get("LLVM4JNI_HOME", None)
LLVM4JNI_USER_OUT_DIR_BASE = "user-llvm4jni-output"
PROCESSOR_MAIN_CLASS = "com.alibaba.graphscope.annotation.Main"
JAVA_CODEGEN_OUTPUT_PREFIX = "gs-ffi"
GRAPE_PROCESSOR_JAR = os.path.join(
GRAPHSCOPE_HOME, "lib", f"grape-runtime-{__version__}-shaded.jar"
)
GIRAPH_DRIVER_CLASS = "com.alibaba.graphscope.app.GiraphComputationAdaptor"
# increase grpc max message size to 2 GB
GS_GRPC_MAX_MESSAGE_LENGTH = 2 * 1024 * 1024 * 1024 - 1
# INTERACTIVE_ENGINE_SCRIPT
INTERACTIVE_INSTANCE_TIMEOUT_SECONDS = 120 # 2 mins
# default threads per worker configuration for GIE/GAIA
INTERACTIVE_ENGINE_THREADS_PER_WORKER = 2
def catch_unknown_errors(response_on_error=None, using_yield=False):
"""A catcher that catches all (unknown) exceptions in gRPC handlers to ensure
the client not think the coordinator services is crashed.
"""
def catch_exceptions(handler):
@functools.wraps(handler)
def handler_execution(self, request, context):
try:
if using_yield:
for result in handler(self, request, context):
yield result
else:
yield handler(self, request, context)
except Exception as exc:
error_message = repr(exc)
error_traceback = traceback.format_exc()
context.set_code(grpc.StatusCode.ABORTED)
context.set_details(
'Error occurs in handler: "%s", with traceback: ' % error_message
+ error_traceback
)
if response_on_error is not None:
yield response_on_error
return handler_execution
return catch_exceptions
def get_timestamp() -> float:
return datetime.datetime.timestamp(datetime.datetime.now())
def get_lib_path(app_dir: str, app_name: str) -> str:
if sys.platform == "linux" or sys.platform == "linux2":
return os.path.join(app_dir, "lib%s.so" % app_name)
elif sys.platform == "darwin":
return os.path.join(app_dir, "lib%s.dylib" % app_name)
else:
raise RuntimeError(f"Unsupported platform {sys.platform}")
def get_app_sha256(attr, java_class_path: str):
(
app_type,
app_header,
app_class,
vd_type,
_,
_,
java_jar_path,
java_app_class,
) = _codegen_app_info(attr, DEFAULT_GS_CONFIG_FILE, java_class_path)
graph_header, graph_type, _, _ = _codegen_graph_info(attr)
logger.info(
"app type: %s (%s), graph type: %s (%s)",
app_class,
app_header,
graph_type,
graph_header,
)
if app_type == "cpp_pie":
app_sha256 = hashlib.sha256(
f"{app_type}.{app_class}.{graph_type}".encode("utf-8", errors="ignore")
).hexdigest()
elif app_type == "java_pie" or app_type == "giraph":
s = hashlib.sha256()
s.update(f"{graph_type}.{vd_type}".encode("utf-8", errors="ignore"))
app_sha256 = s.hexdigest()
logger.info(
"app sha256 for app %s with graph %s:%s, is %s",
java_app_class,
app_type,
java_app_class,
app_sha256,
)
else:
s = hashlib.sha256()
s.update(
f"{app_type}.{app_class}.{graph_type}".encode("utf-8", errors="ignore")
)
if types_pb2.GAR in attr:
s.update(attr[types_pb2.GAR].s)
app_sha256 = s.hexdigest()
return app_sha256
def get_graph_sha256(attr):
_, graph_class, _, _ = _codegen_graph_info(attr)
return hashlib.sha256(graph_class.encode("utf-8", errors="ignore")).hexdigest()
def check_java_app_graph_consistency(
app_class, cpp_graph_type, java_class_template_str
):
splited = cpp_graph_type.split("<")
java_app_type_params = java_class_template_str[:-1].split("<")[-1].split(",")
if splited[0] == "vineyard::ArrowFragment":
if app_class.find("Property") == -1:
raise RuntimeError(
"Expected property app, inconsistent app and graph {}, {}".format(
app_class, cpp_graph_type
)
)
if len(java_app_type_params) != 1:
raise RuntimeError("Expected 4 type params in java app")
if splited[0] == "gs::ArrowProjectedFragment":
if app_class.find("Projected") == -1:
raise RuntimeError(
"Expected Projected app, inconsistent app and graph {}, {}".format(
app_class, cpp_graph_type
)
)
if len(java_app_type_params) != 4:
raise RuntimeError("Expected 4 type params in java app")
graph_actual_type_params = splited[1][:-1].split(",")
for i in range(0, len(java_app_type_params)):
graph_actual_type_param = graph_actual_type_params[i]
java_app_type_param = java_app_type_params[i]
if not _type_param_consistent(graph_actual_type_param, java_app_type_param):
raise RuntimeError(
"Error in check app and graph consistency, type params index {}, cpp: {}, java: {}".format(
i, graph_actual_type_param, java_app_type_param
)
)
return True
def check_giraph_app_graph_consistency(
app_class, cpp_graph_type, java_class_template_str
):
# Split the C++ graph type to get the parameters
split_cpp_graph_type = cpp_graph_type.split("<")
java_app_type_params = java_class_template_str[:-1].split("<")[-1].split(",")
# Ensure the graph type is supported
if split_cpp_graph_type[0] != "gs::ArrowProjectedFragment":
raise RuntimeError("Giraph app only supports projected graph")
# Extract actual type parameters from the graph
graph_actual_type_params = split_cpp_graph_type[1][:-1].split(",")
# Define expected mapping between graph and app parameters
# (cpp index: java index)
index_mapping = {0: 0, 2: 1, 3: 2} # oid_t # vdata_t # edata_t
# Check consistency between graph and app type parameters
for cpp_index, java_index in index_mapping.items():
if not _type_param_consistent(
graph_actual_type_params[cpp_index], java_app_type_params[java_index]
):
raise RuntimeError(
"Error in check app and graph consistency, type params index {}, cpp: {}, java: {}".format(
cpp_index,
graph_actual_type_params[cpp_index],
java_app_type_params[java_index],
)
)
return True
def run_command(args: str, cwd=None, **kwargs):
logger.info("Running command: %s, cwd: %s", args, cwd)
cp = subprocess.run(shlex.split(args), capture_output=True, cwd=cwd, **kwargs)
if cp.returncode != 0:
err = cp.stderr.decode("utf-8", errors="ignore")
logger.error(
"Failed to run command: %s, error message is: %s",
args,
err,
)
raise RuntimeError(f"Failed to run command: {args}, err: {err}")
return cp.stdout.decode("utf-8", errors="ignore")
def delegate_command_to_pod(args: str, pod: str, container: str):
"""Delegate a command to a pod.
Args:
command (str): Command to be delegated.
pod_name (str): Pod name.
namespace (str): Namespace of the pod.
Returns:
str: Output of the command.
"""
# logger.info("Delegate command to pod: %s, %s, %s", args, pod, container)
args = f'kubectl exec -c {container} {pod} -- bash -c "{args}"'
return run_command(args)
def run_kube_cp_command(src, dst, pod, container=None, host_to_pod=True):
if host_to_pod:
cmd = f"kubectl cp {src} {pod}:{dst}"
else:
cmd = f"kubectl cp {pod}:{src} {dst}"
if container is not None:
cmd = f"{cmd} -c {container}"
cmd = f"{cmd} --retries=5"
return run_command(cmd)
def compile_library(commands, workdir, output_name, launcher):
if launcher.type() == types_pb2.K8S:
return _compile_on_kubernetes(
commands,
workdir,
output_name,
launcher.hosts_list[0],
ANALYTICAL_CONTAINER_NAME,
)
elif launcher.type() == types_pb2.HOSTS:
return _compile_on_local(commands, workdir, output_name)
else:
raise RuntimeError(f"Unsupported launcher type: {launcher.type()}")
def _compile_on_kubernetes(commands, workdir, output_name, pod, container):
logger.info(
"compile on kubernetes, %s, %s, %s, %s, %s",
commands,
workdir,
output_name,
pod,
container,
)
try:
lib_path = get_lib_path(workdir, output_name)
try:
# The library may exists in the analytical pod.
test_cmd = f"test -f {lib_path}"
logger.debug(delegate_command_to_pod(test_cmd, pod, container))
logger.info("Library exists, skip compilation")
logger.debug(run_kube_cp_command(lib_path, lib_path, pod, container, False))
return lib_path
except RuntimeError:
pass
parent_dir = os.path.dirname(workdir)
mkdir = f"mkdir -p {parent_dir}"
logger.debug(delegate_command_to_pod(mkdir, pod, container))
logger.debug(run_kube_cp_command(workdir, workdir, pod, container, True))
for command in commands:
command = f"cd {workdir} && {command}"
logger.debug(delegate_command_to_pod(command, pod, container))
logger.debug(run_kube_cp_command(lib_path, lib_path, pod, container, False))
if not os.path.isfile(lib_path):
logger.error("Could not find desired library, found files are:")
logger.error(os.listdir(workdir))
raise FileNotFoundError(lib_path)
except Exception as e:
raise CompilationError(f"Failed to compile {output_name} on kubernetes") from e
return lib_path
def _compile_on_local(commands, workdir, output_name):
logger.info("compile on local, %s, %s, %s", commands, workdir, output_name)
try:
for command in commands:
logger.debug(run_command(command, cwd=workdir))
lib_path = get_lib_path(workdir, output_name)
if not os.path.isfile(lib_path):
logger.error("Could not find desired library")
logger.info(os.listdir(workdir))
raise FileNotFoundError(lib_path)
except Exception as e:
raise CompilationError(
f"Failed to compile {output_name} on platform {get_platform_info()}"
) from e
return lib_path
def compile_app(
workspace: str,
library_name: str,
attr: dict,
engine_config: dict,
launcher,
java_class_path: str,
):
"""Compile an application.
Args:
workspace (str): working dir.
library_name (str): name of library
attr (`AttrValue`): All information needed to compile an app.
engine_config (dict): for options of NETWORKX
Returns:
str: Path of the built library.
str: Java jar path. For c++/python app, return None.
str: Directory containing generated java and jni code. For c++/python app, return None.
str: App type.
"""
logger.info("Building app library...")
library_dir = os.path.join(workspace, library_name)
os.makedirs(library_dir, exist_ok=True)
_extract_gar(library_dir, attr)
# codegen app and graph info
# vd_type and md_type is None in cpp_pie
(
app_type,
app_header,
app_class,
vd_type,
md_type,
pregel_combine,
java_jar_path,
java_app_class,
) = _codegen_app_info(attr, DEFAULT_GS_CONFIG_FILE, java_class_path)
logger.info(
"Codegened application type: %s, app header: %s, app_class: %s, vd_type: %s, md_type: %s, pregel_combine: %s, \
java_jar_path: %s, java_app_class: %s",
app_type,
app_header,
app_class,
str(vd_type),
str(md_type),
str(pregel_combine),
str(java_jar_path),
str(java_app_class),
)
graph_header, graph_type, graph_oid_type, graph_vid_type = _codegen_graph_info(attr)
if app_type == "java_pie":
logger.info(
"Check consistent between java app %s and graph %s",
java_app_class,
graph_type,
)
check_java_app_graph_consistency(app_class, graph_type, java_app_class)
if app_type == "giraph":
logger.info(
"Check consistent between giraph app %s and graph %s",
java_app_class,
graph_type,
)
check_giraph_app_graph_consistency(app_class, graph_type, java_app_class)
os.chdir(library_dir)
module_name = ""
# Output directory for java codegen
java_codegen_out_dir = ""
# set OPAL_PREFIX in CMAKE_PREFIX_PATH
OPAL_PREFIX = os.environ.get("OPAL_PREFIX", "")
cmake_commands = [
"cmake",
".",
f"-DNETWORKX={engine_config['networkx']}",
f"-DCMAKE_PREFIX_PATH='{GRAPHSCOPE_HOME};{OPAL_PREFIX}'",
]
if types_pb2.CMAKE_EXTRA_OPTIONS in attr:
extra_options = (
attr[types_pb2.CMAKE_EXTRA_OPTIONS]
.s.decode("utf-8", errors="ignore")
.split(" ")
)
cmake_commands.extend(extra_options)
if os.environ.get("GRAPHSCOPE_ANALYTICAL_DEBUG", "") == "1":
cmake_commands.append("-DCMAKE_BUILD_TYPE=Debug")
if app_type == "java_pie" or app_type == "giraph":
# for java need to run preprocess, and the generated files can be reused,
# if the fragment & vd type is same.
java_codegen_out_dir = os.path.join(
workspace, f"{JAVA_CODEGEN_OUTPUT_PREFIX}-{library_name}"
)
# TODO(zhanglei): Could this codegen caching happens on engine side?
if os.path.isdir(java_codegen_out_dir):
logger.info(
"Found existing java codegen directory: %s, skipped codegen",
java_codegen_out_dir,
)
cmake_commands += ["-DJAVA_APP_CODEGEN=OFF"]
else:
cmake_commands += ["-DJAVA_APP_CODEGEN=ON"]
cmake_commands += [
"-DENABLE_JAVA_SDK=ON",
"-DJAVA_PIE_APP=ON",
f"-DPRE_CP={GRAPE_PROCESSOR_JAR}:{java_jar_path}",
f"-DPROCESSOR_MAIN_CLASS={PROCESSOR_MAIN_CLASS}",
f"-DJAR_PATH={java_jar_path}",
f"-DOUTPUT_DIR={java_codegen_out_dir}",
]
# if run llvm4jni.sh not found, we just go ahead,since it is optional.
# The go ahead part moves to `gscoordinator/template/CMakeLists.template`
if LLVM4JNI_HOME:
llvm4jni_user_out_dir = os.path.join(
workspace, f"{LLVM4JNI_USER_OUT_DIR_BASE}-{library_name}"
)
cmake_commands += [
f"-DRUN_LLVM4JNI_SH={os.path.join(LLVM4JNI_HOME, 'run.sh')}",
f"-DLLVM4JNI_OUTPUT={llvm4jni_user_out_dir}",
f"-DLIB_PATH={get_lib_path(library_dir, library_name)}",
]
else:
logger.info(
"Skip running llvm4jni since env var LLVM4JNI_HOME not found or run.sh not found under LLVM4JNI_HOME"
)
elif app_type == "cpp_flash":
cmake_commands += ["-DFLASH_APP=ON"]
elif app_type not in ("cpp_pie", "cpp_pregel"): # Cython
if app_type == "cython_pregel":
pxd_name = "pregel"
cmake_commands += ["-DCYTHON_PREGEL_APP=ON"]
if pregel_combine:
cmake_commands += ["-DENABLE_PREGEL_COMBINE=ON"]
else:
pxd_name = "pie"
cmake_commands += ["-DCYTHON_PIE_APP=ON"]
if "Python_ROOT_DIR" in os.environ:
python3_path = os.path.join(os.environ["Python_ROOT_DIR"], "bin", "python3")
elif "CONDA_PREFIX" in os.environ:
python3_path = os.path.join(os.environ["CONDA_PREFIX"], "bin", "python3")
else:
python3_path = shutil.which("python3")
cmake_commands.append(f"-DPython3_EXECUTABLE={python3_path}")
# Copy pxd file and generate cc file from pyx
shutil.copyfile(
os.path.join(TEMPLATE_DIR, f"{pxd_name}.pxd.template"),
os.path.join(library_dir, f"{pxd_name}.pxd"),
)
# Assume the gar will have and only have one .pyx file
for pyx_file in glob.glob(library_dir + "/*.pyx"):
module_name = os.path.splitext(os.path.basename(pyx_file))[0]
cc_file = os.path.join(library_dir, module_name + ".cc")
subprocess.check_call(["cython", "-3", "--cplus", "-o", cc_file, pyx_file])
app_header = f"{module_name}.h"
# replace and generate cmakelist
cmakelists_file_tmp = os.path.join(TEMPLATE_DIR, "CMakeLists.template")
cmakelists_file = os.path.join(library_dir, "CMakeLists.txt")
with open(cmakelists_file_tmp, mode="r") as template:
content = template.read()
content = Template(content).safe_substitute(
_analytical_engine_home=ANALYTICAL_ENGINE_HOME,
_frame_name=library_name,
_oid_type=graph_oid_type,
_vid_type=graph_vid_type,
_vd_type=vd_type,
_md_type=md_type,
_graph_type=graph_type,
_graph_header=graph_header,
_module_name=module_name,
_app_type=app_class,
_app_header=app_header,
)
with open(cmakelists_file, mode="w") as f:
f.write(content)
# compile
commands = [" ".join(cmake_commands), "make -j2"]
lib_path = compile_library(commands, library_dir, library_name, launcher)
return lib_path, java_jar_path, java_codegen_out_dir, app_type
def compile_graph_frame(
workspace: str,
library_name: str,
attr: dict,
engine_config: dict,
launcher,
):
"""Compile a graph.
Args:
workspace (str): Working dir.
library_name (str): name of library
attr (`AttrValue`): All information needed to compile a graph library.
engine_config (dict): for options of NETWORKX
Raises:
ValueError: When graph_type is not supported.
Returns:
str: Path of the built graph library.
None: For consistency with compiler_app.
None: For consistency with compile_app.
None: for consistency with compile_app.
"""
logger.info("Building graph library ...")
_, graph_class, _, _ = _codegen_graph_info(attr)
library_dir = os.path.join(workspace, library_name)
os.makedirs(library_dir, exist_ok=True)
# replace and generate cmakelist
cmakelists_file_tmp = os.path.join(TEMPLATE_DIR, "CMakeLists.template")
cmakelists_file = os.path.join(library_dir, "CMakeLists.txt")
with open(cmakelists_file_tmp, mode="r", encoding="utf-8") as template:
content = template.read()
content = Template(content).safe_substitute(
_analytical_engine_home=ANALYTICAL_ENGINE_HOME,
_frame_name=library_name,
_graph_type=graph_class,
)
with open(cmakelists_file, mode="w", encoding="utf-8") as f:
f.write(content)
# set OPAL_PREFIX in CMAKE_PREFIX_PATH
OPAL_PREFIX = os.environ.get("OPAL_PREFIX", "")
cmake_commands = [
"cmake",
".",
f"-DNETWORKX={engine_config['networkx']}",
f"-DENABLE_JAVA_SDK={engine_config['enable_java_sdk']}",
f"-DCMAKE_PREFIX_PATH='{GRAPHSCOPE_HOME};{OPAL_PREFIX}'",
]
if os.environ.get("GRAPHSCOPE_ANALYTICAL_DEBUG", "") == "1":
cmake_commands.append("-DCMAKE_BUILD_TYPE=Debug")
logger.info("Enable java sdk: %s", engine_config["enable_java_sdk"])
graph_type = attr[types_pb2.GRAPH_TYPE].i
if graph_type == graph_def_pb2.ARROW_PROPERTY:
cmake_commands += ["-DPROPERTY_GRAPH_FRAME=ON"]
elif graph_type in (
graph_def_pb2.ARROW_PROJECTED,
graph_def_pb2.DYNAMIC_PROJECTED,
graph_def_pb2.ARROW_FLATTENED,
):
cmake_commands += ["-DPROJECT_FRAME=ON"]
else:
raise ValueError(f"Illegal graph type: {graph_type}")
# compile
commands = [" ".join(cmake_commands), "make -j2"]
lib_path = compile_library(commands, library_dir, library_name, launcher)
return lib_path, None, None, None
def _type_param_consistent(graph_actucal_type_param, java_app_type_param):
if java_app_type_param == "java.lang.Long":
if graph_actucal_type_param in {"uint64_t", "int64_t"}:
return True
return False
if java_app_type_param == "java.lang.Double":
if graph_actucal_type_param in {"double"}:
return True
return False
if java_app_type_param == "java.lang.Integer":
if graph_actucal_type_param in {"int32_t", "uint32_t"}:
return True
return False
if java_app_type_param == "com.alibaba.graphscope.ds.StringView":
if graph_actucal_type_param in {"std::string"}:
return True
return False
return False
def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
if op.op == types_pb2.CREATE_GRAPH:
_pre_process_for_create_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.ADD_LABELS:
_pre_process_for_add_labels_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.RUN_APP:
_pre_process_for_run_app_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.BIND_APP:
_pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.PROJECT_GRAPH:
_pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.CONSOLIDATE_COLUMNS:
_pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.PROJECT_TO_SIMPLE:
_pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.ADD_COLUMN:
_pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.UNLOAD_GRAPH:
_pre_process_for_unload_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.ARCHIVE_GRAPH:
_pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op in (
types_pb2.CONTEXT_TO_NUMPY,
types_pb2.CONTEXT_TO_DATAFRAME,
types_pb2.TO_VINEYARD_TENSOR,
types_pb2.TO_VINEYARD_DATAFRAME,
types_pb2.OUTPUT,
):
_pre_process_for_context_op(op, op_result_pool, key_to_op, **kwargs)
if op.op in (types_pb2.GRAPH_TO_NUMPY, types_pb2.GRAPH_TO_DATAFRAME):
_pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.UNLOAD_APP:
_pre_process_for_unload_app_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.UNLOAD_CONTEXT:
_pre_process_for_unload_context_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.DATA_SINK:
_pre_process_for_data_sink_op(op, op_result_pool, key_to_op, **kwargs)
if op.op in (types_pb2.TO_DIRECTED, types_pb2.TO_UNDIRECTED):
_pre_process_for_transform_op(op, op_result_pool, key_to_op, **kwargs)
def _pre_process_for_create_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) <= 1
if len(op.parents) == 1:
key_of_parent_op = op.parents[0]
parent_op = key_to_op[key_of_parent_op]
if parent_op.op == types_pb2.DATA_SOURCE:
op.large_attr.CopyFrom(parent_op.large_attr)
# loading graph with giraph format need jvm environ.
if "engine_java_class_path" in kwargs:
engine_java_class_path = kwargs.pop("engine_java_class_path")
op.attr[types_pb2.JAVA_CLASS_PATH].CopyFrom(
utils.s_to_attr(engine_java_class_path)
)
if "engine_jvm_opts" in kwargs:
engine_jvm_opts = kwargs.pop("engine_jvm_opts")
op.attr[types_pb2.JVM_OPTS].CopyFrom(utils.s_to_attr(engine_jvm_opts))
def _pre_process_for_add_labels_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 2
for key_of_parent_op in op.parents:
parent_op = key_to_op[key_of_parent_op]
if parent_op.op == types_pb2.DATA_SOURCE:
op.large_attr.CopyFrom(parent_op.large_attr)
else:
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
utils.s_to_attr(result.graph_def.key)
)
def _pre_process_for_transform_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
result = op_result_pool[op.parents[0]]
# To compatible with eager evaluation cases where it has the key.
if types_pb2.GRAPH_NAME not in op.attr:
op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key))
# get `bind_app` runtime information in lazy mode
def _pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs):
for key_of_parent_op in op.parents:
parent_op = key_to_op[key_of_parent_op]
if parent_op.op == types_pb2.CREATE_APP:
# app assets
op.attr[types_pb2.APP_ALGO].CopyFrom(parent_op.attr[types_pb2.APP_ALGO])
if types_pb2.GAR in parent_op.attr:
op.attr[types_pb2.GAR].CopyFrom(parent_op.attr[types_pb2.GAR])
else:
# get graph runtime information from results
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(
s=result.graph_def.key.encode("utf-8", errors="ignore")
)
)
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
attr_value_pb2.AttrValue(i=result.graph_def.graph_type)
)
assert result.graph_def.extension.Is(
graph_def_pb2.VineyardInfoPb.DESCRIPTOR
) or result.graph_def.extension.Is(
graph_def_pb2.MutableGraphInfoPb.DESCRIPTOR
)
if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR):
vy_info = graph_def_pb2.VineyardInfoPb()
result.graph_def.extension.Unpack(vy_info)
op.attr[types_pb2.OID_TYPE].CopyFrom(
utils.s_to_attr(
utils.normalize_data_type_str(
utils.data_type_to_cpp(vy_info.oid_type)
)
)
)
op.attr[types_pb2.VID_TYPE].CopyFrom(
utils.s_to_attr(
utils.normalize_data_type_str(
utils.data_type_to_cpp(vy_info.vid_type)
)
)
)
op.attr[types_pb2.V_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(vy_info.vdata_type))
)
op.attr[types_pb2.E_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(vy_info.edata_type))
)
elif result.graph_def.extension.Is(
graph_def_pb2.MutableGraphInfoPb.DESCRIPTOR
):
graph_info = graph_def_pb2.MutableGraphInfoPb()
result.graph_def.extension.Unpack(graph_info)
op.attr[types_pb2.V_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(graph_info.vdata_type))
)
op.attr[types_pb2.E_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(graph_info.edata_type))
)
# get `run_app` runtime information in lazy mode
def _pre_process_for_run_app_op(op, op_result_pool, key_to_op, **kwargs):
# run_app op has only one parent
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
parent_op = key_to_op[key_of_parent_op]
assert parent_op.op == types_pb2.BIND_APP
# set graph key
op.attr[types_pb2.GRAPH_NAME].CopyFrom(parent_op.attr[types_pb2.GRAPH_NAME])
result = op_result_pool[key_of_parent_op]
# set app key
op.attr[types_pb2.APP_NAME].CopyFrom(
attr_value_pb2.AttrValue(
s=result.result.decode("utf-8", errors="ignore").encode(
"utf-8", errors="ignore"
)
)
)
# loading graph with giraph format need jvm environ.
if "engine_java_class_path" in kwargs:
engine_java_class_path = kwargs.pop("engine_java_class_path")
op.attr[types_pb2.JAVA_CLASS_PATH].CopyFrom(
utils.s_to_attr(engine_java_class_path)
)
if "engine_jvm_opts" in kwargs:
engine_jvm_opts = kwargs.pop("engine_jvm_opts")
op.attr[types_pb2.JVM_OPTS].CopyFrom(utils.s_to_attr(engine_jvm_opts))
app_type = parent_op.attr[types_pb2.APP_ALGO].s.decode("utf-8", errors="ignore")
if app_type.startswith("java_pie:") or app_type.startswith("giraph:"):
logger.debug("args length: %s", len(op.query_args.args))
if len(op.query_args.args) == 1:
original_json_param = data_types_pb2.StringValue()
op.query_args.args[0].Unpack(original_json_param)
logger.debug("original user params: %s", original_json_param)
user_params = json.loads(original_json_param.value)
del op.query_args.args[0]
elif len(op.query_args.args) == 0:
user_params = {}
else:
raise RuntimeError(
"Unexpected num of params: {}".format(len(op.query_args.args))
)
# we need extra param in first arg.
user_params["jar_name"] = engine_java_class_path
user_params["frag_name"] = "gs::ArrowProjectedFragment<{},{},{},{}>".format(
parent_op.attr[types_pb2.OID_TYPE].s.decode("utf-8", errors="ignore"),
parent_op.attr[types_pb2.VID_TYPE].s.decode("utf-8", errors="ignore"),
parent_op.attr[types_pb2.V_DATA_TYPE].s.decode("utf-8", errors="ignore"),
parent_op.attr[types_pb2.E_DATA_TYPE].s.decode("utf-8", errors="ignore"),
)
# for giraph app, we need to add args into original query_args, which is a json string
# first one should be user params, second should be lib_path
if app_type.startswith("giraph:"):
user_params["app_class"] = GIRAPH_DRIVER_CLASS
user_params["user_app_class"] = app_type[7:]
else:
user_params["app_class"] = app_type.split(":")[-1]
logger.debug("user params: %s", json.dumps(user_params))
new_user_param = Any()
new_user_param.Pack(data_types_pb2.StringValue(value=json.dumps(user_params)))
op.query_args.args.extend([new_user_param])
# For java app, we need lib path as an explicit arg.
lib_param = Any()
lib_path = parent_op.attr[types_pb2.APP_LIBRARY_PATH].s.decode(
"utf-8", errors="ignore"
)
logger.info("Java app: Lib path: %s", lib_path)
lib_param.Pack(data_types_pb2.StringValue(value=lib_path))
op.query_args.args.extend([lib_param])
def _pre_process_for_unload_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key))
if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR):
vy_info = graph_def_pb2.VineyardInfoPb()
result.graph_def.extension.Unpack(vy_info)
op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id))
def _pre_process_for_unload_app_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.APP_NAME].CopyFrom(
utils.s_to_attr(result.result.decode("utf-8", errors="ignore"))
)
def _pre_process_for_unload_context_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
parent_op_result = json.loads(result.result.decode("utf-8", errors="ignore"))
context_key = parent_op_result["context_key"]
op.attr[types_pb2.CONTEXT_KEY].CopyFrom(
attr_value_pb2.AttrValue(s=context_key.encode("utf-8", errors="ignore"))
)
def _pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs):
for key_of_parent_op in op.parents:
parent_op = key_to_op[key_of_parent_op]
if parent_op.op != types_pb2.RUN_APP:
# get graph information
r = op_result_pool[key_of_parent_op]
graph_name = r.graph_def.key
graph_type = r.graph_def.graph_type
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
for key_of_parent_op in op.parents:
parent_op = key_to_op[key_of_parent_op]
if parent_op.op == types_pb2.RUN_APP:
selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8", errors="ignore")
r = op_result_pool[key_of_parent_op]
parent_op_result = json.loads(r.result.decode("utf-8", errors="ignore"))
context_key = parent_op_result["context_key"]
context_type = parent_op_result["context_type"]
selector = _transform_dataframe_selector(context_type, schema, selector)
op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(graph_name))
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(utils.graph_type_to_attr(graph_type))
op.attr[types_pb2.CONTEXT_KEY].CopyFrom(utils.s_to_attr(context_key))
op.attr[types_pb2.SELECTOR].CopyFrom(utils.s_to_attr(selector))
def _pre_process_for_context_op(op, op_result_pool, key_to_op, **kwargs):
def __backtrack_key_of_graph_op(key):
bfs_queue = Queue()
bfs_queue.put(key)
while not bfs_queue.empty():
next_op_key = bfs_queue.get()
if next_op_key in key_to_op:
next_op = key_to_op[next_op_key]
if next_op.op in (
types_pb2.CREATE_GRAPH,
types_pb2.ADD_COLUMN,
types_pb2.ADD_LABELS,
types_pb2.TRANSFORM_GRAPH,
types_pb2.PROJECT_GRAPH,
types_pb2.PROJECT_TO_SIMPLE,
types_pb2.TO_DIRECTED,
types_pb2.TO_UNDIRECTED,
):
return next_op
for parent_key in next_op.parents:
bfs_queue.put(parent_key)
return None
assert len(op.parents) == 1
schema = None
key_of_parent_op = op.parents[0]
graph_op = __backtrack_key_of_graph_op(key_of_parent_op)
r = op_result_pool[key_of_parent_op]
# set context key
parent_op_result = json.loads(r.result.decode("utf-8", errors="ignore"))
context_key = parent_op_result["context_key"]
context_type = parent_op_result["context_type"]
op.attr[types_pb2.CONTEXT_KEY].CopyFrom(
attr_value_pb2.AttrValue(s=context_key.encode("utf-8", errors="ignore"))
)
r = op_result_pool[graph_op.key]
# transform selector
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8", errors="ignore")
if op.op in (
types_pb2.CONTEXT_TO_DATAFRAME,
types_pb2.TO_VINEYARD_DATAFRAME,
types_pb2.OUTPUT,
):
selector = _transform_dataframe_selector(context_type, schema, selector)
else:
# to numpy
selector = _transform_numpy_selector(context_type, schema, selector)
if selector is not None:
op.attr[types_pb2.SELECTOR].CopyFrom(
attr_value_pb2.AttrValue(s=selector.encode("utf-8", errors="ignore"))
)
def _pre_process_for_data_sink_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
parent_op = key_to_op[key_of_parent_op]
result = op_result_pool[key_of_parent_op]
if parent_op.output_type in (
types_pb2.VINEYARD_TENSOR,
types_pb2.VINEYARD_DATAFRAME,
):
# dependent to to_vineyard_dataframe
r = json.loads(result.result.decode("utf-8", errors="ignore"))["object_id"]
op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.s_to_attr(r))
def _pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
graph_name = r.graph_def.key
selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8", errors="ignore")
if op.op == types_pb2.GRAPH_TO_DATAFRAME:
selector = _transform_dataframe_selector(
"labeled_vertex_property", schema, selector
)
else:
# to numpy
selector = _transform_numpy_selector(
"labeled_vertex_property", schema, selector
)
if selector is not None:
op.attr[types_pb2.SELECTOR].CopyFrom(
attr_value_pb2.AttrValue(s=selector.encode("utf-8", errors="ignore"))
)
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)
def _pre_process_for_project_to_simple_op( # noqa: C901
op, op_result_pool, key_to_op, **kwargs
):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
# for nx graph
if r.graph_def.graph_type == graph_def_pb2.DYNAMIC_PROPERTY:
graph_info = graph_def_pb2.MutableGraphInfoPb()
r.graph_def.extension.Unpack(graph_info)
schema = json.loads(graph_info.property_schema_json)
graph_name = r.graph_def.key
v_prop = op.attr[types_pb2.V_PROP_KEY].s.decode("utf-8", errors="ignore")
e_prop = op.attr[types_pb2.E_PROP_KEY].s.decode("utf-8", errors="ignore")
v_prop_type = graph_def_pb2.NULLVALUE
e_prop_type = graph_def_pb2.NULLVALUE
if v_prop != "None" and v_prop in schema["vertex"]:
v_prop_type = schema["vertex"][v_prop]
if e_prop != "None" and e_prop in schema["edge"]:
e_prop_type = schema["edge"][e_prop]
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
utils.graph_type_to_attr(graph_def_pb2.DYNAMIC_PROJECTED)
)
op.attr[types_pb2.V_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(v_prop_type))
)
op.attr[types_pb2.E_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(e_prop_type))
)
return
# for arrow property graph
def _check_v_prop_exists_in_all_v_labels(schema, prop):
exists = True
for v_label in schema.vertex_labels:
exists = exists and schema.vertex_property_exists(v_label, prop)
return exists
def _check_e_prop_exists_in_all_e_labels(schema, prop):
exists = True
for e_label in schema.edge_labels:
exists = exists and schema.edge_property_exists(e_label, prop)
return exists
# get parent graph schema
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
graph_name = r.graph_def.key
if schema.vertex_label_num == 0:
raise RuntimeError(
"Failed to project to simple graph as no vertex exists in this graph."
)
if schema.edge_label_num == 0:
raise RuntimeError(
"Failed to project to simple graph as no edge exists in this graph."
)
need_flatten_graph = False
if schema.vertex_label_num > 1 or schema.edge_label_num > 1:
need_flatten_graph = True
# check and get vertex property
v_prop = op.attr[types_pb2.V_PROP_KEY].s.decode("utf-8", errors="ignore")
if v_prop == "None":
v_prop_id = -1
v_prop_type = graph_def_pb2.NULLVALUE
if not need_flatten_graph:
# for projected graph
# if there is only one property on the label, uses this property
v_label = schema.vertex_labels[0]
if schema.vertex_properties_num(v_label) == 1:
v_prop = schema.get_vertex_properties(v_label)[0]
v_prop_id = v_prop.id
v_prop_type = v_prop.type
else:
# v_prop should exists in all labels
if not _check_v_prop_exists_in_all_v_labels(schema, v_prop):
raise RuntimeError(
"Property {0} doesn't exists in all vertex labels".format(v_prop)
)
# get vertex property id
v_prop_id = schema.get_vertex_property_id(schema.vertex_labels[0], v_prop)
# get vertex property type
v_prop_type = graph_def_pb2.NULLVALUE
v_props = schema.get_vertex_properties(schema.vertex_labels[0])
for v_prop in v_props:
if v_prop.id == v_prop_id:
v_prop_type = v_prop.type
break
# check and get edge property
e_prop = op.attr[types_pb2.E_PROP_KEY].s.decode("utf-8", errors="ignore")
if e_prop == "None":
e_prop_id = -1
e_prop_type = graph_def_pb2.NULLVALUE
if not need_flatten_graph:
# for projected graph
# if there is only one property on the label, uses this property
e_label = schema.edge_labels[0]
if schema.edge_properties_num(e_label) == 1:
e_prop = schema.get_edge_properties(e_label)[0]
e_prop_id = e_prop.id
e_prop_type = e_prop.type
else:
# e_prop should exists in all labels
if not _check_e_prop_exists_in_all_e_labels(schema, e_prop):
raise RuntimeError(
"Property {0} doesn't exists in all edge labels".format(e_prop)
)
# get edge property id
e_prop_id = schema.get_edge_property_id(schema.edge_labels[0], e_prop)
# get edge property type
e_props = schema.get_edge_properties(schema.edge_labels[0])
e_prop_type = graph_def_pb2.NULLVALUE
for e_prop in e_props:
if e_prop.id == e_prop_id:
e_prop_type = e_prop.type
break
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)
op.attr[types_pb2.OID_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(schema.oid_type))
)
op.attr[types_pb2.VID_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(schema.vid_type))
)
op.attr[types_pb2.V_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(v_prop_type))
)
op.attr[types_pb2.E_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(e_prop_type))
)
if need_flatten_graph:
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
utils.graph_type_to_attr(graph_def_pb2.ARROW_FLATTENED)
)
op.attr[types_pb2.V_PROP_KEY].CopyFrom(utils.s_to_attr(str(v_prop_id)))
op.attr[types_pb2.E_PROP_KEY].CopyFrom(utils.s_to_attr(str(e_prop_id)))
else:
v_label = schema.vertex_labels[0]
e_label = schema.edge_labels[0]
relation = (v_label, v_label)
check_argument(
relation in schema.get_relationships(e_label),
f"Cannot project to simple, Graph doesn't contain such relationship: {v_label} -> {e_label} <- {v_label}.",
)
v_label_id = schema.get_vertex_label_id(v_label)
e_label_id = schema.get_edge_label_id(e_label)
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
utils.graph_type_to_attr(graph_def_pb2.ARROW_PROJECTED)
)
op.attr[types_pb2.V_LABEL_ID].CopyFrom(utils.i_to_attr(v_label_id))
op.attr[types_pb2.V_PROP_ID].CopyFrom(utils.i_to_attr(v_prop_id))
op.attr[types_pb2.E_LABEL_ID].CopyFrom(utils.i_to_attr(e_label_id))
op.attr[types_pb2.E_PROP_ID].CopyFrom(utils.i_to_attr(e_prop_id))
def _pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs):
def _get_all_v_props_id(schema, label):
props = schema.get_vertex_properties(label)
return [schema.get_vertex_property_id(label, prop.name) for prop in props]
def _get_all_e_props_id(schema, label):
props = schema.get_edge_properties(label)
return [schema.get_edge_property_id(label, prop.name) for prop in props]
assert len(op.parents) == 1
# get parent graph schema
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
graph_name = r.graph_def.key
vertices = json.loads(
op.attr[types_pb2.VERTEX_COLLECTIONS].s.decode("utf-8", errors="ignore")
)
edges = json.loads(
op.attr[types_pb2.EDGE_COLLECTIONS].s.decode("utf-8", errors="ignore")
)
vertex_collections = {}
edge_collections = {}
for label, props in vertices.items():
label_id = schema.get_vertex_label_id(label)
if props is None:
vertex_collections[label_id] = _get_all_v_props_id(schema, label)
else:
vertex_collections[label_id] = sorted(
[schema.get_vertex_property_id(label, prop) for prop in props]
)
for label, props in edges.items():
relations = schema.get_relationships(label)
valid = False
for src, dst in relations:
if src in vertices and dst in vertices:
valid = True
break
if not valid:
raise ValueError("Cannot find a valid relation in given vertices and edges")
label_id = schema.get_edge_label_id(label)
if props is None:
edge_collections[label_id] = _get_all_e_props_id(schema, label)
else:
edge_collections[label_id] = sorted(
[schema.get_edge_property_id(label, prop) for prop in props]
)
vertex_collections = dict(sorted(vertex_collections.items()))
edge_collections = dict(sorted(edge_collections.items()))
# construct op attr
attr = attr_value_pb2.AttrValue()
v_attr = attr_value_pb2.NameAttrList()
e_attr = attr_value_pb2.NameAttrList()
for label, props in vertex_collections.items():
v_attr.attr[label].CopyFrom(utils.list_i_to_attr(props))
for label, props in edge_collections.items():
e_attr.attr[label].CopyFrom(utils.list_i_to_attr(props))
attr.list.func.extend([v_attr, e_attr])
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)
op.attr[types_pb2.ARROW_PROPERTY_DEFINITION].CopyFrom(attr)
del op.attr[types_pb2.VERTEX_COLLECTIONS]
del op.attr[types_pb2.EDGE_COLLECTIONS]
def _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
# get parent graph schema
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
graph_name = r.graph_def.key
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)
def _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key))
if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR):
vy_info = graph_def_pb2.VineyardInfoPb()
result.graph_def.extension.Unpack(vy_info)
op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id))
# Below are selector transformation part, which will transform label / property
# names to corresponding id.
def _transform_vertex_data_v(selector):
if selector not in ("v.id", "v.data", "v.label_id"):
raise SyntaxError("selector of v must be 'id', 'data' or 'label_id'")
return selector
def _transform_vertex_data_e(selector):
if selector not in ("e.src", "e.dst", "e.data"):
raise SyntaxError("selector of e must be 'src', 'dst' or 'data'")
return selector
def _transform_vertex_data_r(selector):
if selector != "r":
raise SyntaxError("selector of r must be 'r'")
return selector
def _transform_vertex_property_data_r(selector):
# The second part of selector or r is user defined name.
# So we will allow any str
return selector
def _transform_labeled_vertex_data_v(schema, label, prop):
label_id = schema.get_vertex_label_id(label)
if prop == "id":
return f"label{label_id}.{prop}"
prop_id = schema.get_vertex_property_id(label, prop)
return f"label{label_id}.property{prop_id}"
def _transform_labeled_vertex_data_e(schema, label, prop):
label_id = schema.get_edge_label_id(label)
if prop in ("src", "dst"):
return f"label{label_id}.{prop}"
prop_id = schema.get_vertex_property_id(label, prop)
return f"label{label_id}.property{prop_id}"
def _transform_labeled_vertex_data_r(schema, label):
label_id = schema.get_vertex_label_id(label)
return f"label{label_id}"
def _transform_labeled_vertex_property_data_r(schema, label, prop):
label_id = schema.get_vertex_label_id(label)
return f"label{label_id}.{prop}"
def transform_vertex_data_selector(schema, selector):
"""Optional values:
vertex selector: 'v.id', 'v.data'
edge selector: 'e.src', 'e.dst', 'e.data'
result selector: 'r'
"""
if selector is None:
raise RuntimeError("selector cannot be None")
segments = selector.split(".")
if len(segments) > 2:
raise SyntaxError("Invalid selector: %s." % selector)
if segments[0] == "v":
selector = _transform_vertex_data_v(selector)
elif segments[0] == "e":
selector = _transform_vertex_data_e(selector)
elif segments[0] == "r":
selector = _transform_vertex_data_r(selector)
else:
raise SyntaxError(f"Invalid selector: {selector}, choose from v / e / r.")
return selector
def transform_vertex_property_data_selector(schema, selector):
"""Optional values:
vertex selector: 'v.id', 'v.data'
edge selector: 'e.src', 'e.dst', 'e.data'
result selector format: 'r.y', y denotes property name.
"""
if selector is None:
raise RuntimeError("selector cannot be None")
segments = selector.split(".")
if len(segments) != 2:
raise SyntaxError(f"Invalid selector: {selector}")
if segments[0] == "v":
selector = _transform_vertex_data_v(selector)
elif segments[0] == "e":
selector = _transform_vertex_data_e(selector)
elif segments[0] == "r":
selector = _transform_vertex_property_data_r(selector)
else:
raise SyntaxError(f"Invalid selector: {selector}, choose from v / e / r.")
return selector
def transform_labeled_vertex_data_selector(schema, selector):
"""Formats: 'v:x.y/id', 'e:x.y/src/dst', 'r:label',
x denotes label name, y denotes property name.
Returned selector will change label name to 'label{id}', where id is x's id in labels.
And change property name to 'property{id}', where id is y's id in properties.
"""
if selector is None:
raise RuntimeError("selector cannot be None")
ret_type, segments = selector.split(":")
if ret_type not in ("v", "e", "r"):
raise SyntaxError(f"Invalid selector: {selector}")
segments = segments.split(".")
ret = ""
if ret_type == "v":
ret = _transform_labeled_vertex_data_v(schema, *segments)
elif ret_type == "e":
ret = _transform_labeled_vertex_data_e(schema, *segments)
elif ret_type == "r":
ret = _transform_labeled_vertex_data_r(schema, *segments)
return "{}:{}".format(ret_type, ret)
def transform_labeled_vertex_property_data_selector(schema, selector):
"""Formats: 'v:x.y/id', 'e:x.y/src/dst', 'r:x.y',
x denotes label name, y denotes property name.
Returned selector will change label name to 'label{id}', where id is x's id in labels.
And change property name to 'property{id}', where id is y's id in properties.
"""
if selector is None:
raise RuntimeError("selector cannot be None")
ret_type, segments = selector.split(":")
if ret_type not in ("v", "e", "r"):
raise SyntaxError(f"Invalid selector: {selector}")
segments = segments.split(".")
ret = ""
if ret_type == "v":
ret = _transform_labeled_vertex_data_v(schema, *segments)
elif ret_type == "e":
ret = _transform_labeled_vertex_data_e(schema, *segments)
elif ret_type == "r":
ret = _transform_labeled_vertex_property_data_r(schema, *segments)
return f"{ret_type}:{ret}"
_transform_selector_func_map = {
"tensor": lambda _, _2: None,
"vertex_data": transform_vertex_data_selector,
"labeled_vertex_data": transform_labeled_vertex_data_selector,
"vertex_property": transform_vertex_property_data_selector,
"labeled_vertex_property": transform_labeled_vertex_property_data_selector,
}
def _transform_numpy_selector(context_type, schema, selector):
return _transform_selector_func_map[context_type](schema, selector)
def _transform_dataframe_selector(context_type, schema, selector):
selector = json.loads(selector)
transform_func = _transform_selector_func_map[context_type]
selector = {key: transform_func(schema, value) for key, value in selector.items()}
return json.dumps(selector)
def _extract_gar(app_dir: str, attr):
"""Extract gar to workspace
Args:
workspace (str): Working directory
attr (`AttrValue`): Optionally it can contains the bytes of gar.
"""
fp = BUILTIN_APP_RESOURCE_PATH # default is builtin app resources.
if types_pb2.GAR in attr:
# if gar sent via bytecode in attr, overwrite.
fp = BytesIO(attr[types_pb2.GAR].s)
with zipfile.ZipFile(fp, "r") as zip_ref:
zip_ref.extractall(app_dir)
def _parse_java_app_type(java_class_path, real_algo):
_java_app_type = ""
_frag_param_str = ""
_java_inner_context_type = ""
_vd_type = ""
_java_executable = find_java_exe()
parse_user_app_cmd = [
_java_executable,
"-cp",
"{}".format(java_class_path),
"com.alibaba.graphscope.utils.AppBaseParser",
real_algo,
]
logger.info(" ".join(parse_user_app_cmd))
parse_user_app_process = subprocess.Popen(
parse_user_app_cmd,
env=os.environ.copy(),
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1,
)
out, err = parse_user_app_process.communicate()
lines = out.split("\n") + err.split("\n")
for line in lines:
logger.info(line)
if len(line) == 0:
continue
elif line.find("Giraph") != -1:
_java_app_type = "giraph"
elif line.find("DefaultPropertyApp") != -1:
_java_app_type = "default_property"
elif line.find("ParallelPropertyApp") != -1:
_java_app_type = "parallel_property"
elif line.find("DefaultAppBase") != -1:
_java_app_type = "default_simple"
elif line.find("ParallelAppBase") != -1:
_java_app_type = "parallel_simple"
elif line.find("Error") != -1:
raise Exception("Error occurred in verifying user app")
elif line.find("TypeParams") != -1:
_frag_param_str = line.split(":")[-1].strip()
elif line.find("ContextType") != -1:
_java_inner_context_type = line.split(":")[-1].strip()
elif line.find("VertexData") != -1:
_vd_type = line.split(":")[-1].strip()
# for giraph app, we manually set java inner ctx type
logger.info(
"Java app type: %s, frag type str: %s, ctx type: %s, vd type %s",
_java_app_type,
_frag_param_str,
_java_inner_context_type,
_vd_type,
)
if (
not _java_app_type
or not _frag_param_str
or not _java_inner_context_type
or not _vd_type
):
raise RuntimeError("Parsed java app error")
parse_user_app_process.wait()
return _java_app_type, _frag_param_str, _java_inner_context_type, _vd_type
def _probe_for_java_app(attr, java_class_path, real_algo):
(
_java_app_type,
_frag_param_str,
_java_inner_context_type,
_vd_type,
) = _parse_java_app_type(java_class_path, real_algo)
if _java_app_type == "giraph":
driver_header = "apps/java_pie/java_pie_projected_default_app.h"
class_name = "gs::JavaPIEProjectedDefaultApp"
elif _java_app_type == "default_property":
driver_header = "apps/java_pie/java_pie_property_default_app.h"
class_name = "gs::JavaPIEPropertyDefaultApp"
elif _java_app_type == "parallel_property":
driver_header = "apps/java_pie/java_pie_property_parallel_app.h"
class_name = "gs::JavaPIEPropertyParallelApp"
elif _java_app_type == "default_simple":
driver_header = "apps/java_pie/java_pie_projected_default_app.h"
class_name = "gs::JavaPIEProjectedDefaultApp"
elif _java_app_type == "parallel_simple":
driver_header = "apps/java_pie/java_pie_projected_parallel_app.h"
class_name = "gs::JavaPIEProjectedParallelApp"
else:
raise RuntimeError(f"Not a supported java_app_type: {_java_app_type}")
return driver_header, class_name, _vd_type, _frag_param_str
def _codegen_app_info(attr, meta_file: str, java_class_path: str):
"""Codegen application by instantiate the template specialization.
Args:
workspace (str): Working directory
meta_file (str): A yaml file that contains metas of all builtin app.
attr (`AttrValue`): For get algorithm name of app.
Raises:
KeyError: If the algorithm name doesn't exist in the `meta_file`
Returns:
type: app_type
app class: for fulfilling the CMakelists.
"""
fp = BUILTIN_APP_RESOURCE_PATH # default is builtin app resources.
if types_pb2.GAR in attr:
# if gar sent via bytecode in attr, overwrite.
fp = BytesIO(attr[types_pb2.GAR].s)
with zipfile.ZipFile(fp, "r") as zip_ref:
with zip_ref.open(meta_file, "r") as f:
config_yaml = yaml.safe_load(f)
algo = attr[types_pb2.APP_ALGO].s.decode("utf-8", errors="ignore")
# for algo start with giraph:, we don't find info in meta
if algo.startswith("giraph:") or algo.startswith("java_pie:"):
(app_type, real_algo) = algo.split(":")
logger.info("codegen app info for java app: %s", real_algo)
src_header, app_class, vd_type, java_app_template_str = _probe_for_java_app(
attr, java_class_path, real_algo
)
return (
app_type,
src_header,
"{}<_GRAPH_TYPE>".format(app_class),
vd_type,
None,
None,
java_class_path,
"{}<{}>".format(real_algo, java_app_template_str),
)
app_info = None
for app in config_yaml["app"]:
if app["algo"] == algo:
app_type = app["type"] # cpp_pie or cython_pregel or cython_pie, java_pie
if app_type in ("cpp_pie", "cpp_pregel", "cpp_flash"):
app_info = (
app_type,
app["src"],
f"{app['class_name']}<_GRAPH_TYPE>",
None,
None,
None,
None,
None,
)
break
if app_type in ("cython_pregel", "cython_pie"):
# cython app doesn't have c-header file
app_info = (
app_type,
"",
"",
app["vd_type"],
app["md_type"],
app["pregel_combine"],
None,
None,
)
break
if app_info is None:
raise KeyError("Algorithm does not exist in the gar resource.")
return app_info
# a mapping for class name to header file.
GRAPH_HEADER_MAP = {
graph_def_pb2.IMMUTABLE_EDGECUT: (
"grape::ImmutableEdgecutFragment",
"grape/fragment/immutable_edgecut_fragment.h",
),
graph_def_pb2.DYNAMIC_PROJECTED: (
"gs::DynamicProjectedFragment",
"core/fragment/dynamic_projected_fragment.h",
),
graph_def_pb2.ARROW_PROPERTY: (
"vineyard::ArrowFragment",
"vineyard/graph/fragment/arrow_fragment.h",
),
graph_def_pb2.ARROW_PROJECTED: (
"gs::ArrowProjectedFragment",
"core/fragment/arrow_projected_fragment.h",
),
graph_def_pb2.DYNAMIC_PROPERTY: (
"gs::DynamicFragment",
"core/fragment/dynamic_fragment.h",
),
graph_def_pb2.ARROW_FLATTENED: (
"gs::ArrowFlattenedFragment",
"core/fragment/arrow_flattened_fragment.h",
),
}
VERETX_MAP_CLASS_MAP = {
graph_def_pb2.GLOBAL_VERTEX_MAP: "vineyard::ArrowVertexMap<{},{}>",
graph_def_pb2.LOCAL_VERTEX_MAP: "vineyard::ArrowLocalVertexMap<{},{}>",
}
def _codegen_graph_info(attr):
# These getter functions are intended for lazy evaluation,
# cause they are not always available in all types of graphs
def oid_type():
if types_pb2.OID_TYPE in attr:
return attr[types_pb2.OID_TYPE].s.decode("utf-8", errors="ignore")
else: # DynamicProjectedFragment doesn't have oid
return None
def vid_type():
return attr[types_pb2.VID_TYPE].s.decode("utf-8", errors="ignore")
def vdata_type():
return attr[types_pb2.V_DATA_TYPE].s.decode("utf-8", errors="ignore")
def edata_type():
return attr[types_pb2.E_DATA_TYPE].s.decode("utf-8", errors="ignore")
def vertex_map_type():
if types_pb2.VERTEX_MAP_TYPE not in attr:
vm_type_enum = graph_def_pb2.GLOBAL_VERTEX_MAP
else:
vm_type_enum = attr[types_pb2.VERTEX_MAP_TYPE].i
def internal_type(t): # The template of vertex map needs special care.
if t == "std::string":
return "vineyard::arrow_string_view"
return t
return VERETX_MAP_CLASS_MAP[vm_type_enum].format(
internal_type(oid_type()), vid_type()
)
def compact_edges():
compact_edges = False
if types_pb2.COMPACT_EDGES in attr:
compact_edges = attr[types_pb2.COMPACT_EDGES].b
return "true" if compact_edges else "false"
graph_type = attr[types_pb2.GRAPH_TYPE].i
graph_class, graph_header = GRAPH_HEADER_MAP[graph_type]
# graph_type is a literal of graph template in c++ side
if graph_type == graph_def_pb2.ARROW_PROPERTY:
# in a format of full qualified name, e.g.
# vineyard::ArrowFragment<int64_t, uin64_t, vineyard::ArrowLocalVertexMap<int64_t, uint64_t>, false>
graph_fqn = f"{graph_class}<{oid_type()},{vid_type()},{vertex_map_type()},{compact_edges()}>"
elif graph_type == graph_def_pb2.ARROW_PROJECTED:
# gs::ArrowProjectedFragment<int64_t, uint64_t, double, double,vineyard::ArrowLocalVertexMap<int64_t, uint64_t>, false>
graph_fqn = f"{graph_class}<{oid_type()},{vid_type()},{vdata_type()},{edata_type()},{vertex_map_type()},{compact_edges()}>" # noqa: E501
elif graph_type == graph_def_pb2.IMMUTABLE_EDGECUT:
# grape::ImmutableEdgecutFragment<int64_t, uint32_t, double, double>
graph_fqn = (
f"{graph_class}<{oid_type()},{vid_type()},{vdata_type()},{edata_type()}>"
)
elif graph_type == graph_def_pb2.ARROW_FLATTENED:
# grape::ArrowFlattenFragment<int64_t, uint32_t, double, double>
graph_fqn = (
f"{graph_class}<{oid_type()},{vid_type()},{vdata_type()},{edata_type()}>"
)
elif graph_type == graph_def_pb2.DYNAMIC_PROJECTED:
# gs::DynamicProjectedFragment<double, double>
graph_fqn = f"{graph_class}<{vdata_type()},{edata_type()}>"
else:
raise ValueError(
f"Unknown graph type: {graph_def_pb2.GraphTypePb.Name(graph_type)}"
)
return graph_header, graph_fqn, oid_type(), vid_type()
def create_single_op_dag(op_type, config=None):
op_def = op_def_pb2.OpDef(op=op_type, key=uuid.uuid4().hex)
if config:
for k, v in config.items():
op_def.attr[k].CopyFrom(v)
dag = op_def_pb2.DagDef()
dag.op.extend([op_def])
return dag
def dump_as_json(schema, path):
out = {}
items = []
idx = 0
for i, vertex_label in enumerate(schema.vertex_labels):
vertex = {
"id": idx,
"label": vertex_label,
"type": "VERTEX",
"propertyDefList": [],
}
for j, value in enumerate(schema.vertex_property_names[i].s):
names = schema.vertex_property_names[i]
types = schema.vertex_property_types[i]
vertex["propertyDefList"].append(
{"id": j, "name": names.s[j], "data_type": types.s[j].upper()}
)
vertex["indexes"] = []
vertex["indexes"].append({"propertyNames": [names.s[0]]})
items.append(vertex)
idx += 1
for i, edge_label in enumerate(schema.edge_labels):
edge = {"id": idx, "label": edge_label, "type": "EDGE", "propertyDefList": []}
for j, value in enumerate(schema.edge_property_names[i].s):
names = schema.edge_property_names[i]
types = schema.edge_property_types[i]
edge["propertyDefList"].append(
{"id": j, "name": names.s[j], "data_type": types.s[j].upper()}
)
edge["rawRelationShips"] = []
edge["rawRelationShips"].append(
{"srcVertexLabel": "xx", "dstVertexLabel": "xx"}
)
idx += 1
items.append(edge)
out["types"] = items
out["partitionNum"] = 4
with open(path, "w") as fp:
json.dump(out, fp)
def dump_string(schema_string, path):
with open(path, "w") as fp:
fp.write(schema_string)
def parse_readable_memory(value):
value = str(value).strip()
num = value[:-2]
suffix = value[-2:]
try:
float(num)
except ValueError as e:
raise ValueError(f"Argument cannot be interpreted as a number: {value}") from e
if suffix not in ["Ki", "Mi", "Gi"]:
raise ValueError(f"Memory suffix must be one of 'Ki', 'Mi' and 'Gi': {value}")
return value
def parse_as_glog_level(log_level):
# log level in glog: INFO=1, DEBUG=10
# log level in python: DEBUG=10, INFO=20
if isinstance(log_level, str):
if log_level == "silent" or log_level == "SILENT":
log_level = -1
else:
log_level = getattr(logging, log_level.upper())
python_to_glog = {0: 100, 10: 10, 20: 1}
return python_to_glog.get(log_level, 0)
def str2bool(s):
if isinstance(s, bool):
return s
if s.lower() in ("yes", "true", "t", "y", "1"):
return True
return False
class ResolveMPICmdPrefix(object):
"""
Class for resolving prefix of mpi command.
Examples:
.. code:: ipython
>>> # openmpi found
>>> rmcp = ResolveMPICmdPrefix()
>>> (cmd, env) = rmcp.resolve(4, 'h1,h2,h3')
>>> cmd
['mpirun', '--allow-run-as-root',
'-n', '4', '-host', 'h1:2,h2:1,h3:1']
>>> env
{'e': '/usr/local/bin/kube_ssh', # if kube_ssh in $PATH
'OMPI_MCA_btl_vader_single_copy_mechanism': 'none',
'OMPI_MCA_orte_allowed_exit_without_sync': '1'}
>>> # if openmpi not found, change to mpich
>>> rmcp = ResolveMPICmdPrefix()
>>> (cmd, env) = rmcp.resolve(4, 'h1,h2,h3')
>>> cmd
['mpirun', '-n', '4', '-host', 'h1:2,h2:1,h3:1']
>>> env
{} # always empty
>>> # run without mpi on localhost when setting `num_workers` to 1
>>> rmcp = ResolveMPICmdPrefix()
>>> (cmd, env) = rmcp.resolve(1, 'localhost')
>>> cmd
[]
>>> env
{}
"""
_OPENMPI_RSH_AGENT = "OMPI_MCA_plm_rsh_agent"
_KUBE_SSH_EXEC = "kube_ssh"
def __init__(self, rsh_agent=False):
self._rsh_agent = rsh_agent
@staticmethod
def openmpi():
ompi_info = ""
if "OPAL_PREFIX" in os.environ:
ompi_info = os.path.expandvars("$OPAL_PREFIX/bin/ompi_info")
if not ompi_info:
if "OPAL_BINDIR" in os.environ:
ompi_info = os.path.expandvars("$OPAL_BINDIR/ompi_info")
if not ompi_info:
ompi_info = "ompi_info"
try:
subprocess.check_call([ompi_info], stdout=subprocess.DEVNULL)
except FileNotFoundError:
return False
return True
@staticmethod
def alloc(num_workers, hosts):
length = len(hosts)
assert length != 0
proc_num = {}
if num_workers >= length:
quotient = num_workers / length
residue = num_workers % length
for host in hosts:
if residue > 0:
proc_num[host] = quotient + 1
residue -= 1
else:
proc_num[host] = quotient
else:
raise RuntimeError("The number of hosts less then num_workers")
return ",".join([f"{host}:{proc_num[host]}" for host in hosts])
@staticmethod
def find_mpi():
mpi = None
if ResolveMPICmdPrefix.openmpi():
if "OPAL_PREFIX" in os.environ:
mpi = os.path.expandvars("$OPAL_PREFIX/bin/mpirun")
if mpi is None:
if "OPAL_BINDIR" in os.environ:
mpi = os.path.expandvars("$OPAL_BINDIR/mpirun")
if mpi is None:
mpi = shutil.which("mpirun")
if mpi is None:
raise RuntimeError("mpirun command not found.")
return mpi
def resolve(self, num_workers: int, hosts: List[str]):
cmd = []
env = {}
if num_workers == 1 and hosts[0] in ("localhost", "127.0.0.1"):
# run without mpi on localhost if workers num is 1
if shutil.which("ssh") is None:
# also need a fake ssh agent
env[self._OPENMPI_RSH_AGENT] = sys.executable
return cmd, env
if self.openmpi():
env["OMPI_MCA_btl_vader_single_copy_mechanism"] = "none"
env["OMPI_MCA_orte_allowed_exit_without_sync"] = "1"
# OMPI sends SIGCONT -> SIGTERM -> SIGKILL to the worker process,
# set the following MCA parameter to zero will eliminate the chances
# where the process dies before receiving the SIGTERM and do cleanup.
env["OMPI_MCA_odls_base_sigkill_timeout"] = "0"
if os.environ.get(self._OPENMPI_RSH_AGENT) is None:
rsh_agent_path = shutil.which(self._KUBE_SSH_EXEC)
if self._rsh_agent and rsh_agent_path is not None:
env[self._OPENMPI_RSH_AGENT] = rsh_agent_path
cmd.extend(
[
self.find_mpi(),
"--allow-run-as-root",
"--bind-to",
"none",
]
)
else:
# ssh agent supported only
cmd.extend([self.find_mpi()])
cmd.extend(["-n", str(num_workers)])
cmd.extend(["-host", self.alloc(num_workers, hosts)])
logger.debug("Resolve mpi cmd prefix: %s", " ".join(cmd))
logger.debug("Resolve mpi env: %s", json.dumps(env))
return cmd, env
# In Analytical engine, assume label ids of vertex entries are continuous
# from zero, and property ids of each label is also continuous from zero.
# When transform schema to interactive engine style, we gather all property names and
# unique them, assign each name a id (index of the vector), then preserve a
# vector<int> for each label, stores mappings from original id to transformed
# id.
def to_interactive_engine_schema(gsa_schema_json):
gsa_schema = json.loads(gsa_schema_json)
prop_set = set()
vertex_label_num = 0
for item in gsa_schema["types"]:
item["id"] = int(item["id"])
if item["type"] == "VERTEX":
vertex_label_num += 1
for prop in item["propertyDefList"]:
prop["id"] = int(prop["id"])
prop_set.add(prop["name"])
prop_list = sorted(list(prop_set))
mg_schema = copy.deepcopy(gsa_schema)
for item in mg_schema["types"]:
if item["propertyDefList"] == "":
item["propertyDefList"] = []
if item["type"] == "VERTEX":
for prop in item["propertyDefList"]:
prop["id"] = 1 + prop_list.index(prop["name"])
elif item["type"] == "EDGE":
item["id"] = vertex_label_num + item["id"]
for prop in item["propertyDefList"]:
prop["id"] = 1 + prop_list.index(prop["name"])
return json.dumps(mg_schema)
def check_argument(condition, message=None):
if not condition:
if message is None:
message = "in '%s'" % inspect.stack()[1].code_context[0]
raise ValueError(f"Check failed: {message}")
def check_server_ready(endpoint, server="gremlin"):
def _check_gremlin_task(endpoint):
from gremlin_python.driver.client import Client
if "MY_POD_NAME" in os.environ:
# inner kubernetes env
if endpoint == "localhost" or endpoint == "127.0.0.1":
# now, used in macOS with docker-desktop kubernetes cluster,
# which external ip is 'localhost' when service type is 'LoadBalancer'
logger.info("In kubernetes env, gremlin server is ready.")
return True
try:
client = Client(f"ws://{endpoint}/gremlin", "g")
# May throw
client.submit("g.V().limit(1)").all().result()
logger.info("Gremlin server is ready.")
finally:
try:
client.close()
except: # noqa: E722
pass
return True
def _check_cypher_task(endpoint):
from neo4j import GraphDatabase
if "MY_POD_NAME" in os.environ:
# inner kubernetes env
if endpoint == "localhost" or endpoint == "127.0.0.1":
logger.info("In kubernetes env, cypher server is ready.")
return True
try:
logger.debug("Try to connect to cypher server.")
driver = GraphDatabase.driver(f"neo4j://{endpoint}", auth=("", ""))
# May throw
driver.verify_connectivity()
logger.info("Checked connectivity to cypher server.")
finally:
try:
driver.close()
except: # noqa: E722
pass
return True
executor = ThreadPoolExecutor(max_workers=20)
begin_time = time.time()
while True:
if server == "gremlin":
t = executor.submit(_check_gremlin_task, endpoint)
elif server == "cypher":
t = executor.submit(_check_cypher_task, endpoint)
else:
raise ValueError(
f"Unsupported server type: {server} other than 'gremlin' or 'cypher'"
)
try:
_ = t.result(timeout=30)
except Exception as e:
t.cancel()
error_message = str(e)
else:
executor.shutdown(wait=False)
return True
time.sleep(3)
if time.time() - begin_time > INTERACTIVE_INSTANCE_TIMEOUT_SECONDS:
executor.shutdown(wait=False)
raise TimeoutError(
f"{server.capitalize()} check query failed: {error_message}"
)
def replace_string_in_dict(dict_obj, old, new):
if isinstance(dict_obj, dict):
for key, value in dict_obj.items():
dict_obj[key] = replace_string_in_dict(value, old, new)
elif isinstance(dict_obj, list):
for index, item in enumerate(dict_obj):
dict_obj[index] = replace_string_in_dict(item, old, new)
elif isinstance(dict_obj, str):
return dict_obj.replace(old, new)
return dict_obj
# Reference: https://gist.github.com/beugley/ccd69945346759eb6142272a6d69b4e0
def human_readable_to_bytes(size):
"""Given a human-readable byte string (e.g. 2G, 30M, 20K),
return the number of bytes. Will raise an exception if the argument has
unexpected form.
"""
# Try to parse the size as if the unit was coded on 1 char.
try:
numeric_size = float(size[:-1])
unit = size[-1]
except ValueError:
try:
# Try to parse the size as if the unit was coded on 2 chars.
numeric_size = float(size[:-2])
unit = size[-2:-1]
except ValueError:
raise ValueError("Can't convert %r to bytes" % size)
unit = unit.upper()
# Now we have a numeric value and a unit. Check the unit and convert to bytes.
if unit == "G":
bytes = numeric_size * 1073741824
elif unit == "M":
bytes = numeric_size * 1048576
elif unit == "K":
bytes = numeric_size * 1024
else:
bytes = numeric_size
return int(bytes)