coordinator/gscoordinator/utils.py (1,695 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import copy import datetime import functools import glob import hashlib import inspect import json import logging import os import shlex import shutil import subprocess import sys import time import traceback import uuid import zipfile from concurrent.futures import ThreadPoolExecutor from io import BytesIO from queue import Queue from string import Template from typing import List import grpc import yaml from google.protobuf.any_pb2 import Any from graphscope.framework import utils from graphscope.framework.errors import CompilationError from graphscope.framework.graph_schema import GraphSchema from graphscope.framework.utils import PipeWatcher from graphscope.framework.utils import find_java_exe from graphscope.framework.utils import get_platform_info from graphscope.framework.utils import get_tempdir from graphscope.proto import attr_value_pb2 from graphscope.proto import data_types_pb2 from graphscope.proto import graph_def_pb2 from graphscope.proto import op_def_pb2 from graphscope.proto import types_pb2 from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME from gscoordinator.version import __version__ logger = logging.getLogger("graphscope") RESOURCE_DIR_NAME = "resource" # runtime workspace try: WORKSPACE = os.environ["GRAPHSCOPE_RUNTIME"] except KeyError: WORKSPACE = os.path.join(get_tempdir(), "gs") # make sure we have permission to create instance workspace try: os.makedirs(os.path.join(WORKSPACE, ".ignore"), exist_ok=True) except: # noqa: E722, pylint: disable=bare-except WORKSPACE = os.path.expanduser("~/.graphscope/gs") # COORDINATOR_HOME # 1) get from gscoordinator python module, if failed, # 2) infer from current directory try: import gscoordinator COORDINATOR_HOME = os.path.abspath(os.path.join(gscoordinator.__file__, "..", "..")) except ModuleNotFoundError: COORDINATOR_HOME = os.path.abspath(os.path.join(__file__, "..", "..")) # GRAPHSCOPE_HOME # 1) get from environment variable `GRAPHSCOPE_HOME`, if not exist, # 2) infer from COORDINATOR_HOME GRAPHSCOPE_HOME = os.environ.get("GRAPHSCOPE_HOME", None) if GRAPHSCOPE_HOME is None: # Note: The order of locations matters possible_locations = [ os.path.join(COORDINATOR_HOME, "graphscope.runtime"), # installed by pip "/opt/graphscope", # installed by gs script "/usr/local", # a popular location ] for location in possible_locations: ANALYTICAL_ENGINE_PATH = os.path.join(location, "bin", "grape_engine") if os.path.isfile(ANALYTICAL_ENGINE_PATH): GRAPHSCOPE_HOME = location break if GRAPHSCOPE_HOME is not None: ANALYTICAL_ENGINE_HOME = GRAPHSCOPE_HOME ANALYTICAL_ENGINE_PATH = os.path.join(GRAPHSCOPE_HOME, "bin", "grape_engine") INTERACTIVE_ENGINE_SCRIPT = os.path.join(GRAPHSCOPE_HOME, "bin", "giectl") else: # resolve from develop source tree # Here the GRAPHSCOPE_HOME has been set to the root of the source tree, # So the engine location doesn't need to check again, # just rely on GRAPHSCOPE_HOME. GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "..") ANALYTICAL_ENGINE_HOME = os.path.join(GRAPHSCOPE_HOME, "analytical_engine") ANALYTICAL_ENGINE_PATH = os.path.join( ANALYTICAL_ENGINE_HOME, "build", "grape_engine" ) INTERACTIVE_ENGINE_SCRIPT = os.path.join( GRAPHSCOPE_HOME, "interactive_engine", "assembly", "src", "bin", "graphscope", "giectl", ) # template directory for code generation TEMPLATE_DIR = os.path.join(COORDINATOR_HOME, "gscoordinator", "template") # builtin app resource BUILTIN_APP_RESOURCE_PATH = os.path.join( COORDINATOR_HOME, "gscoordinator", "builtin", "app", "builtin_app.gar" ) # default config file in gar resource DEFAULT_GS_CONFIG_FILE = ".gs_conf.yaml" ANALYTICAL_BUILTIN_SPACE = os.path.join(GRAPHSCOPE_HOME, "precompiled", "builtin") # ANALYTICAL_ENGINE_JAVA_HOME ANALYTICAL_ENGINE_JAVA_HOME = ANALYTICAL_ENGINE_HOME ANALYTICAL_ENGINE_JAVA_RUNTIME_JAR = os.path.join( ANALYTICAL_ENGINE_JAVA_HOME, "lib", f"grape-runtime-{__version__}-shaded.jar", ) ANALYTICAL_ENGINE_JAVA_GIRAPH_JAR = os.path.join( ANALYTICAL_ENGINE_JAVA_HOME, "lib", f"grape-giraph-{__version__}-shaded.jar", ) ANALYTICAL_ENGINE_JAVA_INIT_CLASS_PATH = ( f"{ANALYTICAL_ENGINE_JAVA_RUNTIME_JAR}:{ANALYTICAL_ENGINE_JAVA_GIRAPH_JAR}" ) ANALYTICAL_ENGINE_JAVA_JVM_OPTS = f"-Djava.library.path={GRAPHSCOPE_HOME}/lib" ANALYTICAL_ENGINE_JAVA_JVM_OPTS += ( f" -Djava.class.path={ANALYTICAL_ENGINE_JAVA_INIT_CLASS_PATH}" ) # JAVA SDK related CONSTANTS LLVM4JNI_HOME = os.environ.get("LLVM4JNI_HOME", None) LLVM4JNI_USER_OUT_DIR_BASE = "user-llvm4jni-output" PROCESSOR_MAIN_CLASS = "com.alibaba.graphscope.annotation.Main" JAVA_CODEGEN_OUTPUT_PREFIX = "gs-ffi" GRAPE_PROCESSOR_JAR = os.path.join( GRAPHSCOPE_HOME, "lib", f"grape-runtime-{__version__}-shaded.jar" ) GIRAPH_DRIVER_CLASS = "com.alibaba.graphscope.app.GiraphComputationAdaptor" # increase grpc max message size to 2 GB GS_GRPC_MAX_MESSAGE_LENGTH = 2 * 1024 * 1024 * 1024 - 1 # INTERACTIVE_ENGINE_SCRIPT INTERACTIVE_INSTANCE_TIMEOUT_SECONDS = 120 # 2 mins # default threads per worker configuration for GIE/GAIA INTERACTIVE_ENGINE_THREADS_PER_WORKER = 2 def catch_unknown_errors(response_on_error=None, using_yield=False): """A catcher that catches all (unknown) exceptions in gRPC handlers to ensure the client not think the coordinator services is crashed. """ def catch_exceptions(handler): @functools.wraps(handler) def handler_execution(self, request, context): try: if using_yield: for result in handler(self, request, context): yield result else: yield handler(self, request, context) except Exception as exc: error_message = repr(exc) error_traceback = traceback.format_exc() context.set_code(grpc.StatusCode.ABORTED) context.set_details( 'Error occurs in handler: "%s", with traceback: ' % error_message + error_traceback ) if response_on_error is not None: yield response_on_error return handler_execution return catch_exceptions def get_timestamp() -> float: return datetime.datetime.timestamp(datetime.datetime.now()) def get_lib_path(app_dir: str, app_name: str) -> str: if sys.platform == "linux" or sys.platform == "linux2": return os.path.join(app_dir, "lib%s.so" % app_name) elif sys.platform == "darwin": return os.path.join(app_dir, "lib%s.dylib" % app_name) else: raise RuntimeError(f"Unsupported platform {sys.platform}") def get_app_sha256(attr, java_class_path: str): ( app_type, app_header, app_class, vd_type, _, _, java_jar_path, java_app_class, ) = _codegen_app_info(attr, DEFAULT_GS_CONFIG_FILE, java_class_path) graph_header, graph_type, _, _ = _codegen_graph_info(attr) logger.info( "app type: %s (%s), graph type: %s (%s)", app_class, app_header, graph_type, graph_header, ) if app_type == "cpp_pie": app_sha256 = hashlib.sha256( f"{app_type}.{app_class}.{graph_type}".encode("utf-8", errors="ignore") ).hexdigest() elif app_type == "java_pie" or app_type == "giraph": s = hashlib.sha256() s.update(f"{graph_type}.{vd_type}".encode("utf-8", errors="ignore")) app_sha256 = s.hexdigest() logger.info( "app sha256 for app %s with graph %s:%s, is %s", java_app_class, app_type, java_app_class, app_sha256, ) else: s = hashlib.sha256() s.update( f"{app_type}.{app_class}.{graph_type}".encode("utf-8", errors="ignore") ) if types_pb2.GAR in attr: s.update(attr[types_pb2.GAR].s) app_sha256 = s.hexdigest() return app_sha256 def get_graph_sha256(attr): _, graph_class, _, _ = _codegen_graph_info(attr) return hashlib.sha256(graph_class.encode("utf-8", errors="ignore")).hexdigest() def check_java_app_graph_consistency( app_class, cpp_graph_type, java_class_template_str ): splited = cpp_graph_type.split("<") java_app_type_params = java_class_template_str[:-1].split("<")[-1].split(",") if splited[0] == "vineyard::ArrowFragment": if app_class.find("Property") == -1: raise RuntimeError( "Expected property app, inconsistent app and graph {}, {}".format( app_class, cpp_graph_type ) ) if len(java_app_type_params) != 1: raise RuntimeError("Expected 4 type params in java app") if splited[0] == "gs::ArrowProjectedFragment": if app_class.find("Projected") == -1: raise RuntimeError( "Expected Projected app, inconsistent app and graph {}, {}".format( app_class, cpp_graph_type ) ) if len(java_app_type_params) != 4: raise RuntimeError("Expected 4 type params in java app") graph_actual_type_params = splited[1][:-1].split(",") for i in range(0, len(java_app_type_params)): graph_actual_type_param = graph_actual_type_params[i] java_app_type_param = java_app_type_params[i] if not _type_param_consistent(graph_actual_type_param, java_app_type_param): raise RuntimeError( "Error in check app and graph consistency, type params index {}, cpp: {}, java: {}".format( i, graph_actual_type_param, java_app_type_param ) ) return True def check_giraph_app_graph_consistency( app_class, cpp_graph_type, java_class_template_str ): # Split the C++ graph type to get the parameters split_cpp_graph_type = cpp_graph_type.split("<") java_app_type_params = java_class_template_str[:-1].split("<")[-1].split(",") # Ensure the graph type is supported if split_cpp_graph_type[0] != "gs::ArrowProjectedFragment": raise RuntimeError("Giraph app only supports projected graph") # Extract actual type parameters from the graph graph_actual_type_params = split_cpp_graph_type[1][:-1].split(",") # Define expected mapping between graph and app parameters # (cpp index: java index) index_mapping = {0: 0, 2: 1, 3: 2} # oid_t # vdata_t # edata_t # Check consistency between graph and app type parameters for cpp_index, java_index in index_mapping.items(): if not _type_param_consistent( graph_actual_type_params[cpp_index], java_app_type_params[java_index] ): raise RuntimeError( "Error in check app and graph consistency, type params index {}, cpp: {}, java: {}".format( cpp_index, graph_actual_type_params[cpp_index], java_app_type_params[java_index], ) ) return True def run_command(args: str, cwd=None, **kwargs): logger.info("Running command: %s, cwd: %s", args, cwd) cp = subprocess.run(shlex.split(args), capture_output=True, cwd=cwd, **kwargs) if cp.returncode != 0: err = cp.stderr.decode("utf-8", errors="ignore") logger.error( "Failed to run command: %s, error message is: %s", args, err, ) raise RuntimeError(f"Failed to run command: {args}, err: {err}") return cp.stdout.decode("utf-8", errors="ignore") def delegate_command_to_pod(args: str, pod: str, container: str): """Delegate a command to a pod. Args: command (str): Command to be delegated. pod_name (str): Pod name. namespace (str): Namespace of the pod. Returns: str: Output of the command. """ # logger.info("Delegate command to pod: %s, %s, %s", args, pod, container) args = f'kubectl exec -c {container} {pod} -- bash -c "{args}"' return run_command(args) def run_kube_cp_command(src, dst, pod, container=None, host_to_pod=True): if host_to_pod: cmd = f"kubectl cp {src} {pod}:{dst}" else: cmd = f"kubectl cp {pod}:{src} {dst}" if container is not None: cmd = f"{cmd} -c {container}" cmd = f"{cmd} --retries=5" return run_command(cmd) def compile_library(commands, workdir, output_name, launcher): if launcher.type() == types_pb2.K8S: return _compile_on_kubernetes( commands, workdir, output_name, launcher.hosts_list[0], ANALYTICAL_CONTAINER_NAME, ) elif launcher.type() == types_pb2.HOSTS: return _compile_on_local(commands, workdir, output_name) else: raise RuntimeError(f"Unsupported launcher type: {launcher.type()}") def _compile_on_kubernetes(commands, workdir, output_name, pod, container): logger.info( "compile on kubernetes, %s, %s, %s, %s, %s", commands, workdir, output_name, pod, container, ) try: lib_path = get_lib_path(workdir, output_name) try: # The library may exists in the analytical pod. test_cmd = f"test -f {lib_path}" logger.debug(delegate_command_to_pod(test_cmd, pod, container)) logger.info("Library exists, skip compilation") logger.debug(run_kube_cp_command(lib_path, lib_path, pod, container, False)) return lib_path except RuntimeError: pass parent_dir = os.path.dirname(workdir) mkdir = f"mkdir -p {parent_dir}" logger.debug(delegate_command_to_pod(mkdir, pod, container)) logger.debug(run_kube_cp_command(workdir, workdir, pod, container, True)) for command in commands: command = f"cd {workdir} && {command}" logger.debug(delegate_command_to_pod(command, pod, container)) logger.debug(run_kube_cp_command(lib_path, lib_path, pod, container, False)) if not os.path.isfile(lib_path): logger.error("Could not find desired library, found files are:") logger.error(os.listdir(workdir)) raise FileNotFoundError(lib_path) except Exception as e: raise CompilationError(f"Failed to compile {output_name} on kubernetes") from e return lib_path def _compile_on_local(commands, workdir, output_name): logger.info("compile on local, %s, %s, %s", commands, workdir, output_name) try: for command in commands: logger.debug(run_command(command, cwd=workdir)) lib_path = get_lib_path(workdir, output_name) if not os.path.isfile(lib_path): logger.error("Could not find desired library") logger.info(os.listdir(workdir)) raise FileNotFoundError(lib_path) except Exception as e: raise CompilationError( f"Failed to compile {output_name} on platform {get_platform_info()}" ) from e return lib_path def compile_app( workspace: str, library_name: str, attr: dict, engine_config: dict, launcher, java_class_path: str, ): """Compile an application. Args: workspace (str): working dir. library_name (str): name of library attr (`AttrValue`): All information needed to compile an app. engine_config (dict): for options of NETWORKX Returns: str: Path of the built library. str: Java jar path. For c++/python app, return None. str: Directory containing generated java and jni code. For c++/python app, return None. str: App type. """ logger.info("Building app library...") library_dir = os.path.join(workspace, library_name) os.makedirs(library_dir, exist_ok=True) _extract_gar(library_dir, attr) # codegen app and graph info # vd_type and md_type is None in cpp_pie ( app_type, app_header, app_class, vd_type, md_type, pregel_combine, java_jar_path, java_app_class, ) = _codegen_app_info(attr, DEFAULT_GS_CONFIG_FILE, java_class_path) logger.info( "Codegened application type: %s, app header: %s, app_class: %s, vd_type: %s, md_type: %s, pregel_combine: %s, \ java_jar_path: %s, java_app_class: %s", app_type, app_header, app_class, str(vd_type), str(md_type), str(pregel_combine), str(java_jar_path), str(java_app_class), ) graph_header, graph_type, graph_oid_type, graph_vid_type = _codegen_graph_info(attr) if app_type == "java_pie": logger.info( "Check consistent between java app %s and graph %s", java_app_class, graph_type, ) check_java_app_graph_consistency(app_class, graph_type, java_app_class) if app_type == "giraph": logger.info( "Check consistent between giraph app %s and graph %s", java_app_class, graph_type, ) check_giraph_app_graph_consistency(app_class, graph_type, java_app_class) os.chdir(library_dir) module_name = "" # Output directory for java codegen java_codegen_out_dir = "" # set OPAL_PREFIX in CMAKE_PREFIX_PATH OPAL_PREFIX = os.environ.get("OPAL_PREFIX", "") cmake_commands = [ "cmake", ".", f"-DNETWORKX={engine_config['networkx']}", f"-DCMAKE_PREFIX_PATH='{GRAPHSCOPE_HOME};{OPAL_PREFIX}'", ] if types_pb2.CMAKE_EXTRA_OPTIONS in attr: extra_options = ( attr[types_pb2.CMAKE_EXTRA_OPTIONS] .s.decode("utf-8", errors="ignore") .split(" ") ) cmake_commands.extend(extra_options) if os.environ.get("GRAPHSCOPE_ANALYTICAL_DEBUG", "") == "1": cmake_commands.append("-DCMAKE_BUILD_TYPE=Debug") if app_type == "java_pie" or app_type == "giraph": # for java need to run preprocess, and the generated files can be reused, # if the fragment & vd type is same. java_codegen_out_dir = os.path.join( workspace, f"{JAVA_CODEGEN_OUTPUT_PREFIX}-{library_name}" ) # TODO(zhanglei): Could this codegen caching happens on engine side? if os.path.isdir(java_codegen_out_dir): logger.info( "Found existing java codegen directory: %s, skipped codegen", java_codegen_out_dir, ) cmake_commands += ["-DJAVA_APP_CODEGEN=OFF"] else: cmake_commands += ["-DJAVA_APP_CODEGEN=ON"] cmake_commands += [ "-DENABLE_JAVA_SDK=ON", "-DJAVA_PIE_APP=ON", f"-DPRE_CP={GRAPE_PROCESSOR_JAR}:{java_jar_path}", f"-DPROCESSOR_MAIN_CLASS={PROCESSOR_MAIN_CLASS}", f"-DJAR_PATH={java_jar_path}", f"-DOUTPUT_DIR={java_codegen_out_dir}", ] # if run llvm4jni.sh not found, we just go ahead,since it is optional. # The go ahead part moves to `gscoordinator/template/CMakeLists.template` if LLVM4JNI_HOME: llvm4jni_user_out_dir = os.path.join( workspace, f"{LLVM4JNI_USER_OUT_DIR_BASE}-{library_name}" ) cmake_commands += [ f"-DRUN_LLVM4JNI_SH={os.path.join(LLVM4JNI_HOME, 'run.sh')}", f"-DLLVM4JNI_OUTPUT={llvm4jni_user_out_dir}", f"-DLIB_PATH={get_lib_path(library_dir, library_name)}", ] else: logger.info( "Skip running llvm4jni since env var LLVM4JNI_HOME not found or run.sh not found under LLVM4JNI_HOME" ) elif app_type == "cpp_flash": cmake_commands += ["-DFLASH_APP=ON"] elif app_type not in ("cpp_pie", "cpp_pregel"): # Cython if app_type == "cython_pregel": pxd_name = "pregel" cmake_commands += ["-DCYTHON_PREGEL_APP=ON"] if pregel_combine: cmake_commands += ["-DENABLE_PREGEL_COMBINE=ON"] else: pxd_name = "pie" cmake_commands += ["-DCYTHON_PIE_APP=ON"] if "Python_ROOT_DIR" in os.environ: python3_path = os.path.join(os.environ["Python_ROOT_DIR"], "bin", "python3") elif "CONDA_PREFIX" in os.environ: python3_path = os.path.join(os.environ["CONDA_PREFIX"], "bin", "python3") else: python3_path = shutil.which("python3") cmake_commands.append(f"-DPython3_EXECUTABLE={python3_path}") # Copy pxd file and generate cc file from pyx shutil.copyfile( os.path.join(TEMPLATE_DIR, f"{pxd_name}.pxd.template"), os.path.join(library_dir, f"{pxd_name}.pxd"), ) # Assume the gar will have and only have one .pyx file for pyx_file in glob.glob(library_dir + "/*.pyx"): module_name = os.path.splitext(os.path.basename(pyx_file))[0] cc_file = os.path.join(library_dir, module_name + ".cc") subprocess.check_call(["cython", "-3", "--cplus", "-o", cc_file, pyx_file]) app_header = f"{module_name}.h" # replace and generate cmakelist cmakelists_file_tmp = os.path.join(TEMPLATE_DIR, "CMakeLists.template") cmakelists_file = os.path.join(library_dir, "CMakeLists.txt") with open(cmakelists_file_tmp, mode="r") as template: content = template.read() content = Template(content).safe_substitute( _analytical_engine_home=ANALYTICAL_ENGINE_HOME, _frame_name=library_name, _oid_type=graph_oid_type, _vid_type=graph_vid_type, _vd_type=vd_type, _md_type=md_type, _graph_type=graph_type, _graph_header=graph_header, _module_name=module_name, _app_type=app_class, _app_header=app_header, ) with open(cmakelists_file, mode="w") as f: f.write(content) # compile commands = [" ".join(cmake_commands), "make -j2"] lib_path = compile_library(commands, library_dir, library_name, launcher) return lib_path, java_jar_path, java_codegen_out_dir, app_type def compile_graph_frame( workspace: str, library_name: str, attr: dict, engine_config: dict, launcher, ): """Compile a graph. Args: workspace (str): Working dir. library_name (str): name of library attr (`AttrValue`): All information needed to compile a graph library. engine_config (dict): for options of NETWORKX Raises: ValueError: When graph_type is not supported. Returns: str: Path of the built graph library. None: For consistency with compiler_app. None: For consistency with compile_app. None: for consistency with compile_app. """ logger.info("Building graph library ...") _, graph_class, _, _ = _codegen_graph_info(attr) library_dir = os.path.join(workspace, library_name) os.makedirs(library_dir, exist_ok=True) # replace and generate cmakelist cmakelists_file_tmp = os.path.join(TEMPLATE_DIR, "CMakeLists.template") cmakelists_file = os.path.join(library_dir, "CMakeLists.txt") with open(cmakelists_file_tmp, mode="r", encoding="utf-8") as template: content = template.read() content = Template(content).safe_substitute( _analytical_engine_home=ANALYTICAL_ENGINE_HOME, _frame_name=library_name, _graph_type=graph_class, ) with open(cmakelists_file, mode="w", encoding="utf-8") as f: f.write(content) # set OPAL_PREFIX in CMAKE_PREFIX_PATH OPAL_PREFIX = os.environ.get("OPAL_PREFIX", "") cmake_commands = [ "cmake", ".", f"-DNETWORKX={engine_config['networkx']}", f"-DENABLE_JAVA_SDK={engine_config['enable_java_sdk']}", f"-DCMAKE_PREFIX_PATH='{GRAPHSCOPE_HOME};{OPAL_PREFIX}'", ] if os.environ.get("GRAPHSCOPE_ANALYTICAL_DEBUG", "") == "1": cmake_commands.append("-DCMAKE_BUILD_TYPE=Debug") logger.info("Enable java sdk: %s", engine_config["enable_java_sdk"]) graph_type = attr[types_pb2.GRAPH_TYPE].i if graph_type == graph_def_pb2.ARROW_PROPERTY: cmake_commands += ["-DPROPERTY_GRAPH_FRAME=ON"] elif graph_type in ( graph_def_pb2.ARROW_PROJECTED, graph_def_pb2.DYNAMIC_PROJECTED, graph_def_pb2.ARROW_FLATTENED, ): cmake_commands += ["-DPROJECT_FRAME=ON"] else: raise ValueError(f"Illegal graph type: {graph_type}") # compile commands = [" ".join(cmake_commands), "make -j2"] lib_path = compile_library(commands, library_dir, library_name, launcher) return lib_path, None, None, None def _type_param_consistent(graph_actucal_type_param, java_app_type_param): if java_app_type_param == "java.lang.Long": if graph_actucal_type_param in {"uint64_t", "int64_t"}: return True return False if java_app_type_param == "java.lang.Double": if graph_actucal_type_param in {"double"}: return True return False if java_app_type_param == "java.lang.Integer": if graph_actucal_type_param in {"int32_t", "uint32_t"}: return True return False if java_app_type_param == "com.alibaba.graphscope.ds.StringView": if graph_actucal_type_param in {"std::string"}: return True return False return False def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901 if op.op == types_pb2.CREATE_GRAPH: _pre_process_for_create_graph_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.ADD_LABELS: _pre_process_for_add_labels_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.RUN_APP: _pre_process_for_run_app_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.BIND_APP: _pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.PROJECT_GRAPH: _pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.CONSOLIDATE_COLUMNS: _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.PROJECT_TO_SIMPLE: _pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.ADD_COLUMN: _pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.UNLOAD_GRAPH: _pre_process_for_unload_graph_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.ARCHIVE_GRAPH: _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs) if op.op in ( types_pb2.CONTEXT_TO_NUMPY, types_pb2.CONTEXT_TO_DATAFRAME, types_pb2.TO_VINEYARD_TENSOR, types_pb2.TO_VINEYARD_DATAFRAME, types_pb2.OUTPUT, ): _pre_process_for_context_op(op, op_result_pool, key_to_op, **kwargs) if op.op in (types_pb2.GRAPH_TO_NUMPY, types_pb2.GRAPH_TO_DATAFRAME): _pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.UNLOAD_APP: _pre_process_for_unload_app_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.UNLOAD_CONTEXT: _pre_process_for_unload_context_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.DATA_SINK: _pre_process_for_data_sink_op(op, op_result_pool, key_to_op, **kwargs) if op.op in (types_pb2.TO_DIRECTED, types_pb2.TO_UNDIRECTED): _pre_process_for_transform_op(op, op_result_pool, key_to_op, **kwargs) def _pre_process_for_create_graph_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) <= 1 if len(op.parents) == 1: key_of_parent_op = op.parents[0] parent_op = key_to_op[key_of_parent_op] if parent_op.op == types_pb2.DATA_SOURCE: op.large_attr.CopyFrom(parent_op.large_attr) # loading graph with giraph format need jvm environ. if "engine_java_class_path" in kwargs: engine_java_class_path = kwargs.pop("engine_java_class_path") op.attr[types_pb2.JAVA_CLASS_PATH].CopyFrom( utils.s_to_attr(engine_java_class_path) ) if "engine_jvm_opts" in kwargs: engine_jvm_opts = kwargs.pop("engine_jvm_opts") op.attr[types_pb2.JVM_OPTS].CopyFrom(utils.s_to_attr(engine_jvm_opts)) def _pre_process_for_add_labels_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 2 for key_of_parent_op in op.parents: parent_op = key_to_op[key_of_parent_op] if parent_op.op == types_pb2.DATA_SOURCE: op.large_attr.CopyFrom(parent_op.large_attr) else: result = op_result_pool[key_of_parent_op] op.attr[types_pb2.GRAPH_NAME].CopyFrom( utils.s_to_attr(result.graph_def.key) ) def _pre_process_for_transform_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 result = op_result_pool[op.parents[0]] # To compatible with eager evaluation cases where it has the key. if types_pb2.GRAPH_NAME not in op.attr: op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key)) # get `bind_app` runtime information in lazy mode def _pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs): for key_of_parent_op in op.parents: parent_op = key_to_op[key_of_parent_op] if parent_op.op == types_pb2.CREATE_APP: # app assets op.attr[types_pb2.APP_ALGO].CopyFrom(parent_op.attr[types_pb2.APP_ALGO]) if types_pb2.GAR in parent_op.attr: op.attr[types_pb2.GAR].CopyFrom(parent_op.attr[types_pb2.GAR]) else: # get graph runtime information from results result = op_result_pool[key_of_parent_op] op.attr[types_pb2.GRAPH_NAME].CopyFrom( attr_value_pb2.AttrValue( s=result.graph_def.key.encode("utf-8", errors="ignore") ) ) op.attr[types_pb2.GRAPH_TYPE].CopyFrom( attr_value_pb2.AttrValue(i=result.graph_def.graph_type) ) assert result.graph_def.extension.Is( graph_def_pb2.VineyardInfoPb.DESCRIPTOR ) or result.graph_def.extension.Is( graph_def_pb2.MutableGraphInfoPb.DESCRIPTOR ) if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR): vy_info = graph_def_pb2.VineyardInfoPb() result.graph_def.extension.Unpack(vy_info) op.attr[types_pb2.OID_TYPE].CopyFrom( utils.s_to_attr( utils.normalize_data_type_str( utils.data_type_to_cpp(vy_info.oid_type) ) ) ) op.attr[types_pb2.VID_TYPE].CopyFrom( utils.s_to_attr( utils.normalize_data_type_str( utils.data_type_to_cpp(vy_info.vid_type) ) ) ) op.attr[types_pb2.V_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(vy_info.vdata_type)) ) op.attr[types_pb2.E_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(vy_info.edata_type)) ) elif result.graph_def.extension.Is( graph_def_pb2.MutableGraphInfoPb.DESCRIPTOR ): graph_info = graph_def_pb2.MutableGraphInfoPb() result.graph_def.extension.Unpack(graph_info) op.attr[types_pb2.V_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(graph_info.vdata_type)) ) op.attr[types_pb2.E_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(graph_info.edata_type)) ) # get `run_app` runtime information in lazy mode def _pre_process_for_run_app_op(op, op_result_pool, key_to_op, **kwargs): # run_app op has only one parent assert len(op.parents) == 1 key_of_parent_op = op.parents[0] parent_op = key_to_op[key_of_parent_op] assert parent_op.op == types_pb2.BIND_APP # set graph key op.attr[types_pb2.GRAPH_NAME].CopyFrom(parent_op.attr[types_pb2.GRAPH_NAME]) result = op_result_pool[key_of_parent_op] # set app key op.attr[types_pb2.APP_NAME].CopyFrom( attr_value_pb2.AttrValue( s=result.result.decode("utf-8", errors="ignore").encode( "utf-8", errors="ignore" ) ) ) # loading graph with giraph format need jvm environ. if "engine_java_class_path" in kwargs: engine_java_class_path = kwargs.pop("engine_java_class_path") op.attr[types_pb2.JAVA_CLASS_PATH].CopyFrom( utils.s_to_attr(engine_java_class_path) ) if "engine_jvm_opts" in kwargs: engine_jvm_opts = kwargs.pop("engine_jvm_opts") op.attr[types_pb2.JVM_OPTS].CopyFrom(utils.s_to_attr(engine_jvm_opts)) app_type = parent_op.attr[types_pb2.APP_ALGO].s.decode("utf-8", errors="ignore") if app_type.startswith("java_pie:") or app_type.startswith("giraph:"): logger.debug("args length: %s", len(op.query_args.args)) if len(op.query_args.args) == 1: original_json_param = data_types_pb2.StringValue() op.query_args.args[0].Unpack(original_json_param) logger.debug("original user params: %s", original_json_param) user_params = json.loads(original_json_param.value) del op.query_args.args[0] elif len(op.query_args.args) == 0: user_params = {} else: raise RuntimeError( "Unexpected num of params: {}".format(len(op.query_args.args)) ) # we need extra param in first arg. user_params["jar_name"] = engine_java_class_path user_params["frag_name"] = "gs::ArrowProjectedFragment<{},{},{},{}>".format( parent_op.attr[types_pb2.OID_TYPE].s.decode("utf-8", errors="ignore"), parent_op.attr[types_pb2.VID_TYPE].s.decode("utf-8", errors="ignore"), parent_op.attr[types_pb2.V_DATA_TYPE].s.decode("utf-8", errors="ignore"), parent_op.attr[types_pb2.E_DATA_TYPE].s.decode("utf-8", errors="ignore"), ) # for giraph app, we need to add args into original query_args, which is a json string # first one should be user params, second should be lib_path if app_type.startswith("giraph:"): user_params["app_class"] = GIRAPH_DRIVER_CLASS user_params["user_app_class"] = app_type[7:] else: user_params["app_class"] = app_type.split(":")[-1] logger.debug("user params: %s", json.dumps(user_params)) new_user_param = Any() new_user_param.Pack(data_types_pb2.StringValue(value=json.dumps(user_params))) op.query_args.args.extend([new_user_param]) # For java app, we need lib path as an explicit arg. lib_param = Any() lib_path = parent_op.attr[types_pb2.APP_LIBRARY_PATH].s.decode( "utf-8", errors="ignore" ) logger.info("Java app: Lib path: %s", lib_path) lib_param.Pack(data_types_pb2.StringValue(value=lib_path)) op.query_args.args.extend([lib_param]) def _pre_process_for_unload_graph_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] result = op_result_pool[key_of_parent_op] op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key)) if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR): vy_info = graph_def_pb2.VineyardInfoPb() result.graph_def.extension.Unpack(vy_info) op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id)) def _pre_process_for_unload_app_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] result = op_result_pool[key_of_parent_op] op.attr[types_pb2.APP_NAME].CopyFrom( utils.s_to_attr(result.result.decode("utf-8", errors="ignore")) ) def _pre_process_for_unload_context_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] result = op_result_pool[key_of_parent_op] parent_op_result = json.loads(result.result.decode("utf-8", errors="ignore")) context_key = parent_op_result["context_key"] op.attr[types_pb2.CONTEXT_KEY].CopyFrom( attr_value_pb2.AttrValue(s=context_key.encode("utf-8", errors="ignore")) ) def _pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs): for key_of_parent_op in op.parents: parent_op = key_to_op[key_of_parent_op] if parent_op.op != types_pb2.RUN_APP: # get graph information r = op_result_pool[key_of_parent_op] graph_name = r.graph_def.key graph_type = r.graph_def.graph_type schema = GraphSchema() schema.from_graph_def(r.graph_def) for key_of_parent_op in op.parents: parent_op = key_to_op[key_of_parent_op] if parent_op.op == types_pb2.RUN_APP: selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8", errors="ignore") r = op_result_pool[key_of_parent_op] parent_op_result = json.loads(r.result.decode("utf-8", errors="ignore")) context_key = parent_op_result["context_key"] context_type = parent_op_result["context_type"] selector = _transform_dataframe_selector(context_type, schema, selector) op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(graph_name)) op.attr[types_pb2.GRAPH_TYPE].CopyFrom(utils.graph_type_to_attr(graph_type)) op.attr[types_pb2.CONTEXT_KEY].CopyFrom(utils.s_to_attr(context_key)) op.attr[types_pb2.SELECTOR].CopyFrom(utils.s_to_attr(selector)) def _pre_process_for_context_op(op, op_result_pool, key_to_op, **kwargs): def __backtrack_key_of_graph_op(key): bfs_queue = Queue() bfs_queue.put(key) while not bfs_queue.empty(): next_op_key = bfs_queue.get() if next_op_key in key_to_op: next_op = key_to_op[next_op_key] if next_op.op in ( types_pb2.CREATE_GRAPH, types_pb2.ADD_COLUMN, types_pb2.ADD_LABELS, types_pb2.TRANSFORM_GRAPH, types_pb2.PROJECT_GRAPH, types_pb2.PROJECT_TO_SIMPLE, types_pb2.TO_DIRECTED, types_pb2.TO_UNDIRECTED, ): return next_op for parent_key in next_op.parents: bfs_queue.put(parent_key) return None assert len(op.parents) == 1 schema = None key_of_parent_op = op.parents[0] graph_op = __backtrack_key_of_graph_op(key_of_parent_op) r = op_result_pool[key_of_parent_op] # set context key parent_op_result = json.loads(r.result.decode("utf-8", errors="ignore")) context_key = parent_op_result["context_key"] context_type = parent_op_result["context_type"] op.attr[types_pb2.CONTEXT_KEY].CopyFrom( attr_value_pb2.AttrValue(s=context_key.encode("utf-8", errors="ignore")) ) r = op_result_pool[graph_op.key] # transform selector schema = GraphSchema() schema.from_graph_def(r.graph_def) selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8", errors="ignore") if op.op in ( types_pb2.CONTEXT_TO_DATAFRAME, types_pb2.TO_VINEYARD_DATAFRAME, types_pb2.OUTPUT, ): selector = _transform_dataframe_selector(context_type, schema, selector) else: # to numpy selector = _transform_numpy_selector(context_type, schema, selector) if selector is not None: op.attr[types_pb2.SELECTOR].CopyFrom( attr_value_pb2.AttrValue(s=selector.encode("utf-8", errors="ignore")) ) def _pre_process_for_data_sink_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] parent_op = key_to_op[key_of_parent_op] result = op_result_pool[key_of_parent_op] if parent_op.output_type in ( types_pb2.VINEYARD_TENSOR, types_pb2.VINEYARD_DATAFRAME, ): # dependent to to_vineyard_dataframe r = json.loads(result.result.decode("utf-8", errors="ignore"))["object_id"] op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.s_to_attr(r)) def _pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] r = op_result_pool[key_of_parent_op] schema = GraphSchema() schema.from_graph_def(r.graph_def) graph_name = r.graph_def.key selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8", errors="ignore") if op.op == types_pb2.GRAPH_TO_DATAFRAME: selector = _transform_dataframe_selector( "labeled_vertex_property", schema, selector ) else: # to numpy selector = _transform_numpy_selector( "labeled_vertex_property", schema, selector ) if selector is not None: op.attr[types_pb2.SELECTOR].CopyFrom( attr_value_pb2.AttrValue(s=selector.encode("utf-8", errors="ignore")) ) op.attr[types_pb2.GRAPH_NAME].CopyFrom( attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore")) ) def _pre_process_for_project_to_simple_op( # noqa: C901 op, op_result_pool, key_to_op, **kwargs ): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] r = op_result_pool[key_of_parent_op] # for nx graph if r.graph_def.graph_type == graph_def_pb2.DYNAMIC_PROPERTY: graph_info = graph_def_pb2.MutableGraphInfoPb() r.graph_def.extension.Unpack(graph_info) schema = json.loads(graph_info.property_schema_json) graph_name = r.graph_def.key v_prop = op.attr[types_pb2.V_PROP_KEY].s.decode("utf-8", errors="ignore") e_prop = op.attr[types_pb2.E_PROP_KEY].s.decode("utf-8", errors="ignore") v_prop_type = graph_def_pb2.NULLVALUE e_prop_type = graph_def_pb2.NULLVALUE if v_prop != "None" and v_prop in schema["vertex"]: v_prop_type = schema["vertex"][v_prop] if e_prop != "None" and e_prop in schema["edge"]: e_prop_type = schema["edge"][e_prop] op.attr[types_pb2.GRAPH_NAME].CopyFrom( attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore")) ) op.attr[types_pb2.GRAPH_TYPE].CopyFrom( utils.graph_type_to_attr(graph_def_pb2.DYNAMIC_PROJECTED) ) op.attr[types_pb2.V_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(v_prop_type)) ) op.attr[types_pb2.E_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(e_prop_type)) ) return # for arrow property graph def _check_v_prop_exists_in_all_v_labels(schema, prop): exists = True for v_label in schema.vertex_labels: exists = exists and schema.vertex_property_exists(v_label, prop) return exists def _check_e_prop_exists_in_all_e_labels(schema, prop): exists = True for e_label in schema.edge_labels: exists = exists and schema.edge_property_exists(e_label, prop) return exists # get parent graph schema schema = GraphSchema() schema.from_graph_def(r.graph_def) graph_name = r.graph_def.key if schema.vertex_label_num == 0: raise RuntimeError( "Failed to project to simple graph as no vertex exists in this graph." ) if schema.edge_label_num == 0: raise RuntimeError( "Failed to project to simple graph as no edge exists in this graph." ) need_flatten_graph = False if schema.vertex_label_num > 1 or schema.edge_label_num > 1: need_flatten_graph = True # check and get vertex property v_prop = op.attr[types_pb2.V_PROP_KEY].s.decode("utf-8", errors="ignore") if v_prop == "None": v_prop_id = -1 v_prop_type = graph_def_pb2.NULLVALUE if not need_flatten_graph: # for projected graph # if there is only one property on the label, uses this property v_label = schema.vertex_labels[0] if schema.vertex_properties_num(v_label) == 1: v_prop = schema.get_vertex_properties(v_label)[0] v_prop_id = v_prop.id v_prop_type = v_prop.type else: # v_prop should exists in all labels if not _check_v_prop_exists_in_all_v_labels(schema, v_prop): raise RuntimeError( "Property {0} doesn't exists in all vertex labels".format(v_prop) ) # get vertex property id v_prop_id = schema.get_vertex_property_id(schema.vertex_labels[0], v_prop) # get vertex property type v_prop_type = graph_def_pb2.NULLVALUE v_props = schema.get_vertex_properties(schema.vertex_labels[0]) for v_prop in v_props: if v_prop.id == v_prop_id: v_prop_type = v_prop.type break # check and get edge property e_prop = op.attr[types_pb2.E_PROP_KEY].s.decode("utf-8", errors="ignore") if e_prop == "None": e_prop_id = -1 e_prop_type = graph_def_pb2.NULLVALUE if not need_flatten_graph: # for projected graph # if there is only one property on the label, uses this property e_label = schema.edge_labels[0] if schema.edge_properties_num(e_label) == 1: e_prop = schema.get_edge_properties(e_label)[0] e_prop_id = e_prop.id e_prop_type = e_prop.type else: # e_prop should exists in all labels if not _check_e_prop_exists_in_all_e_labels(schema, e_prop): raise RuntimeError( "Property {0} doesn't exists in all edge labels".format(e_prop) ) # get edge property id e_prop_id = schema.get_edge_property_id(schema.edge_labels[0], e_prop) # get edge property type e_props = schema.get_edge_properties(schema.edge_labels[0]) e_prop_type = graph_def_pb2.NULLVALUE for e_prop in e_props: if e_prop.id == e_prop_id: e_prop_type = e_prop.type break op.attr[types_pb2.GRAPH_NAME].CopyFrom( attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore")) ) op.attr[types_pb2.OID_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(schema.oid_type)) ) op.attr[types_pb2.VID_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(schema.vid_type)) ) op.attr[types_pb2.V_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(v_prop_type)) ) op.attr[types_pb2.E_DATA_TYPE].CopyFrom( utils.s_to_attr(utils.data_type_to_cpp(e_prop_type)) ) if need_flatten_graph: op.attr[types_pb2.GRAPH_TYPE].CopyFrom( utils.graph_type_to_attr(graph_def_pb2.ARROW_FLATTENED) ) op.attr[types_pb2.V_PROP_KEY].CopyFrom(utils.s_to_attr(str(v_prop_id))) op.attr[types_pb2.E_PROP_KEY].CopyFrom(utils.s_to_attr(str(e_prop_id))) else: v_label = schema.vertex_labels[0] e_label = schema.edge_labels[0] relation = (v_label, v_label) check_argument( relation in schema.get_relationships(e_label), f"Cannot project to simple, Graph doesn't contain such relationship: {v_label} -> {e_label} <- {v_label}.", ) v_label_id = schema.get_vertex_label_id(v_label) e_label_id = schema.get_edge_label_id(e_label) op.attr[types_pb2.GRAPH_TYPE].CopyFrom( utils.graph_type_to_attr(graph_def_pb2.ARROW_PROJECTED) ) op.attr[types_pb2.V_LABEL_ID].CopyFrom(utils.i_to_attr(v_label_id)) op.attr[types_pb2.V_PROP_ID].CopyFrom(utils.i_to_attr(v_prop_id)) op.attr[types_pb2.E_LABEL_ID].CopyFrom(utils.i_to_attr(e_label_id)) op.attr[types_pb2.E_PROP_ID].CopyFrom(utils.i_to_attr(e_prop_id)) def _pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs): def _get_all_v_props_id(schema, label): props = schema.get_vertex_properties(label) return [schema.get_vertex_property_id(label, prop.name) for prop in props] def _get_all_e_props_id(schema, label): props = schema.get_edge_properties(label) return [schema.get_edge_property_id(label, prop.name) for prop in props] assert len(op.parents) == 1 # get parent graph schema key_of_parent_op = op.parents[0] r = op_result_pool[key_of_parent_op] schema = GraphSchema() schema.from_graph_def(r.graph_def) graph_name = r.graph_def.key vertices = json.loads( op.attr[types_pb2.VERTEX_COLLECTIONS].s.decode("utf-8", errors="ignore") ) edges = json.loads( op.attr[types_pb2.EDGE_COLLECTIONS].s.decode("utf-8", errors="ignore") ) vertex_collections = {} edge_collections = {} for label, props in vertices.items(): label_id = schema.get_vertex_label_id(label) if props is None: vertex_collections[label_id] = _get_all_v_props_id(schema, label) else: vertex_collections[label_id] = sorted( [schema.get_vertex_property_id(label, prop) for prop in props] ) for label, props in edges.items(): relations = schema.get_relationships(label) valid = False for src, dst in relations: if src in vertices and dst in vertices: valid = True break if not valid: raise ValueError("Cannot find a valid relation in given vertices and edges") label_id = schema.get_edge_label_id(label) if props is None: edge_collections[label_id] = _get_all_e_props_id(schema, label) else: edge_collections[label_id] = sorted( [schema.get_edge_property_id(label, prop) for prop in props] ) vertex_collections = dict(sorted(vertex_collections.items())) edge_collections = dict(sorted(edge_collections.items())) # construct op attr attr = attr_value_pb2.AttrValue() v_attr = attr_value_pb2.NameAttrList() e_attr = attr_value_pb2.NameAttrList() for label, props in vertex_collections.items(): v_attr.attr[label].CopyFrom(utils.list_i_to_attr(props)) for label, props in edge_collections.items(): e_attr.attr[label].CopyFrom(utils.list_i_to_attr(props)) attr.list.func.extend([v_attr, e_attr]) op.attr[types_pb2.GRAPH_NAME].CopyFrom( attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore")) ) op.attr[types_pb2.ARROW_PROPERTY_DEFINITION].CopyFrom(attr) del op.attr[types_pb2.VERTEX_COLLECTIONS] del op.attr[types_pb2.EDGE_COLLECTIONS] def _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 # get parent graph schema key_of_parent_op = op.parents[0] r = op_result_pool[key_of_parent_op] graph_name = r.graph_def.key op.attr[types_pb2.GRAPH_NAME].CopyFrom( attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore")) ) def _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] result = op_result_pool[key_of_parent_op] op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key)) if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR): vy_info = graph_def_pb2.VineyardInfoPb() result.graph_def.extension.Unpack(vy_info) op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id)) # Below are selector transformation part, which will transform label / property # names to corresponding id. def _transform_vertex_data_v(selector): if selector not in ("v.id", "v.data", "v.label_id"): raise SyntaxError("selector of v must be 'id', 'data' or 'label_id'") return selector def _transform_vertex_data_e(selector): if selector not in ("e.src", "e.dst", "e.data"): raise SyntaxError("selector of e must be 'src', 'dst' or 'data'") return selector def _transform_vertex_data_r(selector): if selector != "r": raise SyntaxError("selector of r must be 'r'") return selector def _transform_vertex_property_data_r(selector): # The second part of selector or r is user defined name. # So we will allow any str return selector def _transform_labeled_vertex_data_v(schema, label, prop): label_id = schema.get_vertex_label_id(label) if prop == "id": return f"label{label_id}.{prop}" prop_id = schema.get_vertex_property_id(label, prop) return f"label{label_id}.property{prop_id}" def _transform_labeled_vertex_data_e(schema, label, prop): label_id = schema.get_edge_label_id(label) if prop in ("src", "dst"): return f"label{label_id}.{prop}" prop_id = schema.get_vertex_property_id(label, prop) return f"label{label_id}.property{prop_id}" def _transform_labeled_vertex_data_r(schema, label): label_id = schema.get_vertex_label_id(label) return f"label{label_id}" def _transform_labeled_vertex_property_data_r(schema, label, prop): label_id = schema.get_vertex_label_id(label) return f"label{label_id}.{prop}" def transform_vertex_data_selector(schema, selector): """Optional values: vertex selector: 'v.id', 'v.data' edge selector: 'e.src', 'e.dst', 'e.data' result selector: 'r' """ if selector is None: raise RuntimeError("selector cannot be None") segments = selector.split(".") if len(segments) > 2: raise SyntaxError("Invalid selector: %s." % selector) if segments[0] == "v": selector = _transform_vertex_data_v(selector) elif segments[0] == "e": selector = _transform_vertex_data_e(selector) elif segments[0] == "r": selector = _transform_vertex_data_r(selector) else: raise SyntaxError(f"Invalid selector: {selector}, choose from v / e / r.") return selector def transform_vertex_property_data_selector(schema, selector): """Optional values: vertex selector: 'v.id', 'v.data' edge selector: 'e.src', 'e.dst', 'e.data' result selector format: 'r.y', y denotes property name. """ if selector is None: raise RuntimeError("selector cannot be None") segments = selector.split(".") if len(segments) != 2: raise SyntaxError(f"Invalid selector: {selector}") if segments[0] == "v": selector = _transform_vertex_data_v(selector) elif segments[0] == "e": selector = _transform_vertex_data_e(selector) elif segments[0] == "r": selector = _transform_vertex_property_data_r(selector) else: raise SyntaxError(f"Invalid selector: {selector}, choose from v / e / r.") return selector def transform_labeled_vertex_data_selector(schema, selector): """Formats: 'v:x.y/id', 'e:x.y/src/dst', 'r:label', x denotes label name, y denotes property name. Returned selector will change label name to 'label{id}', where id is x's id in labels. And change property name to 'property{id}', where id is y's id in properties. """ if selector is None: raise RuntimeError("selector cannot be None") ret_type, segments = selector.split(":") if ret_type not in ("v", "e", "r"): raise SyntaxError(f"Invalid selector: {selector}") segments = segments.split(".") ret = "" if ret_type == "v": ret = _transform_labeled_vertex_data_v(schema, *segments) elif ret_type == "e": ret = _transform_labeled_vertex_data_e(schema, *segments) elif ret_type == "r": ret = _transform_labeled_vertex_data_r(schema, *segments) return "{}:{}".format(ret_type, ret) def transform_labeled_vertex_property_data_selector(schema, selector): """Formats: 'v:x.y/id', 'e:x.y/src/dst', 'r:x.y', x denotes label name, y denotes property name. Returned selector will change label name to 'label{id}', where id is x's id in labels. And change property name to 'property{id}', where id is y's id in properties. """ if selector is None: raise RuntimeError("selector cannot be None") ret_type, segments = selector.split(":") if ret_type not in ("v", "e", "r"): raise SyntaxError(f"Invalid selector: {selector}") segments = segments.split(".") ret = "" if ret_type == "v": ret = _transform_labeled_vertex_data_v(schema, *segments) elif ret_type == "e": ret = _transform_labeled_vertex_data_e(schema, *segments) elif ret_type == "r": ret = _transform_labeled_vertex_property_data_r(schema, *segments) return f"{ret_type}:{ret}" _transform_selector_func_map = { "tensor": lambda _, _2: None, "vertex_data": transform_vertex_data_selector, "labeled_vertex_data": transform_labeled_vertex_data_selector, "vertex_property": transform_vertex_property_data_selector, "labeled_vertex_property": transform_labeled_vertex_property_data_selector, } def _transform_numpy_selector(context_type, schema, selector): return _transform_selector_func_map[context_type](schema, selector) def _transform_dataframe_selector(context_type, schema, selector): selector = json.loads(selector) transform_func = _transform_selector_func_map[context_type] selector = {key: transform_func(schema, value) for key, value in selector.items()} return json.dumps(selector) def _extract_gar(app_dir: str, attr): """Extract gar to workspace Args: workspace (str): Working directory attr (`AttrValue`): Optionally it can contains the bytes of gar. """ fp = BUILTIN_APP_RESOURCE_PATH # default is builtin app resources. if types_pb2.GAR in attr: # if gar sent via bytecode in attr, overwrite. fp = BytesIO(attr[types_pb2.GAR].s) with zipfile.ZipFile(fp, "r") as zip_ref: zip_ref.extractall(app_dir) def _parse_java_app_type(java_class_path, real_algo): _java_app_type = "" _frag_param_str = "" _java_inner_context_type = "" _vd_type = "" _java_executable = find_java_exe() parse_user_app_cmd = [ _java_executable, "-cp", "{}".format(java_class_path), "com.alibaba.graphscope.utils.AppBaseParser", real_algo, ] logger.info(" ".join(parse_user_app_cmd)) parse_user_app_process = subprocess.Popen( parse_user_app_cmd, env=os.environ.copy(), encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, bufsize=1, ) out, err = parse_user_app_process.communicate() lines = out.split("\n") + err.split("\n") for line in lines: logger.info(line) if len(line) == 0: continue elif line.find("Giraph") != -1: _java_app_type = "giraph" elif line.find("DefaultPropertyApp") != -1: _java_app_type = "default_property" elif line.find("ParallelPropertyApp") != -1: _java_app_type = "parallel_property" elif line.find("DefaultAppBase") != -1: _java_app_type = "default_simple" elif line.find("ParallelAppBase") != -1: _java_app_type = "parallel_simple" elif line.find("Error") != -1: raise Exception("Error occurred in verifying user app") elif line.find("TypeParams") != -1: _frag_param_str = line.split(":")[-1].strip() elif line.find("ContextType") != -1: _java_inner_context_type = line.split(":")[-1].strip() elif line.find("VertexData") != -1: _vd_type = line.split(":")[-1].strip() # for giraph app, we manually set java inner ctx type logger.info( "Java app type: %s, frag type str: %s, ctx type: %s, vd type %s", _java_app_type, _frag_param_str, _java_inner_context_type, _vd_type, ) if ( not _java_app_type or not _frag_param_str or not _java_inner_context_type or not _vd_type ): raise RuntimeError("Parsed java app error") parse_user_app_process.wait() return _java_app_type, _frag_param_str, _java_inner_context_type, _vd_type def _probe_for_java_app(attr, java_class_path, real_algo): ( _java_app_type, _frag_param_str, _java_inner_context_type, _vd_type, ) = _parse_java_app_type(java_class_path, real_algo) if _java_app_type == "giraph": driver_header = "apps/java_pie/java_pie_projected_default_app.h" class_name = "gs::JavaPIEProjectedDefaultApp" elif _java_app_type == "default_property": driver_header = "apps/java_pie/java_pie_property_default_app.h" class_name = "gs::JavaPIEPropertyDefaultApp" elif _java_app_type == "parallel_property": driver_header = "apps/java_pie/java_pie_property_parallel_app.h" class_name = "gs::JavaPIEPropertyParallelApp" elif _java_app_type == "default_simple": driver_header = "apps/java_pie/java_pie_projected_default_app.h" class_name = "gs::JavaPIEProjectedDefaultApp" elif _java_app_type == "parallel_simple": driver_header = "apps/java_pie/java_pie_projected_parallel_app.h" class_name = "gs::JavaPIEProjectedParallelApp" else: raise RuntimeError(f"Not a supported java_app_type: {_java_app_type}") return driver_header, class_name, _vd_type, _frag_param_str def _codegen_app_info(attr, meta_file: str, java_class_path: str): """Codegen application by instantiate the template specialization. Args: workspace (str): Working directory meta_file (str): A yaml file that contains metas of all builtin app. attr (`AttrValue`): For get algorithm name of app. Raises: KeyError: If the algorithm name doesn't exist in the `meta_file` Returns: type: app_type app class: for fulfilling the CMakelists. """ fp = BUILTIN_APP_RESOURCE_PATH # default is builtin app resources. if types_pb2.GAR in attr: # if gar sent via bytecode in attr, overwrite. fp = BytesIO(attr[types_pb2.GAR].s) with zipfile.ZipFile(fp, "r") as zip_ref: with zip_ref.open(meta_file, "r") as f: config_yaml = yaml.safe_load(f) algo = attr[types_pb2.APP_ALGO].s.decode("utf-8", errors="ignore") # for algo start with giraph:, we don't find info in meta if algo.startswith("giraph:") or algo.startswith("java_pie:"): (app_type, real_algo) = algo.split(":") logger.info("codegen app info for java app: %s", real_algo) src_header, app_class, vd_type, java_app_template_str = _probe_for_java_app( attr, java_class_path, real_algo ) return ( app_type, src_header, "{}<_GRAPH_TYPE>".format(app_class), vd_type, None, None, java_class_path, "{}<{}>".format(real_algo, java_app_template_str), ) app_info = None for app in config_yaml["app"]: if app["algo"] == algo: app_type = app["type"] # cpp_pie or cython_pregel or cython_pie, java_pie if app_type in ("cpp_pie", "cpp_pregel", "cpp_flash"): app_info = ( app_type, app["src"], f"{app['class_name']}<_GRAPH_TYPE>", None, None, None, None, None, ) break if app_type in ("cython_pregel", "cython_pie"): # cython app doesn't have c-header file app_info = ( app_type, "", "", app["vd_type"], app["md_type"], app["pregel_combine"], None, None, ) break if app_info is None: raise KeyError("Algorithm does not exist in the gar resource.") return app_info # a mapping for class name to header file. GRAPH_HEADER_MAP = { graph_def_pb2.IMMUTABLE_EDGECUT: ( "grape::ImmutableEdgecutFragment", "grape/fragment/immutable_edgecut_fragment.h", ), graph_def_pb2.DYNAMIC_PROJECTED: ( "gs::DynamicProjectedFragment", "core/fragment/dynamic_projected_fragment.h", ), graph_def_pb2.ARROW_PROPERTY: ( "vineyard::ArrowFragment", "vineyard/graph/fragment/arrow_fragment.h", ), graph_def_pb2.ARROW_PROJECTED: ( "gs::ArrowProjectedFragment", "core/fragment/arrow_projected_fragment.h", ), graph_def_pb2.DYNAMIC_PROPERTY: ( "gs::DynamicFragment", "core/fragment/dynamic_fragment.h", ), graph_def_pb2.ARROW_FLATTENED: ( "gs::ArrowFlattenedFragment", "core/fragment/arrow_flattened_fragment.h", ), } VERETX_MAP_CLASS_MAP = { graph_def_pb2.GLOBAL_VERTEX_MAP: "vineyard::ArrowVertexMap<{},{}>", graph_def_pb2.LOCAL_VERTEX_MAP: "vineyard::ArrowLocalVertexMap<{},{}>", } def _codegen_graph_info(attr): # These getter functions are intended for lazy evaluation, # cause they are not always available in all types of graphs def oid_type(): if types_pb2.OID_TYPE in attr: return attr[types_pb2.OID_TYPE].s.decode("utf-8", errors="ignore") else: # DynamicProjectedFragment doesn't have oid return None def vid_type(): return attr[types_pb2.VID_TYPE].s.decode("utf-8", errors="ignore") def vdata_type(): return attr[types_pb2.V_DATA_TYPE].s.decode("utf-8", errors="ignore") def edata_type(): return attr[types_pb2.E_DATA_TYPE].s.decode("utf-8", errors="ignore") def vertex_map_type(): if types_pb2.VERTEX_MAP_TYPE not in attr: vm_type_enum = graph_def_pb2.GLOBAL_VERTEX_MAP else: vm_type_enum = attr[types_pb2.VERTEX_MAP_TYPE].i def internal_type(t): # The template of vertex map needs special care. if t == "std::string": return "vineyard::arrow_string_view" return t return VERETX_MAP_CLASS_MAP[vm_type_enum].format( internal_type(oid_type()), vid_type() ) def compact_edges(): compact_edges = False if types_pb2.COMPACT_EDGES in attr: compact_edges = attr[types_pb2.COMPACT_EDGES].b return "true" if compact_edges else "false" graph_type = attr[types_pb2.GRAPH_TYPE].i graph_class, graph_header = GRAPH_HEADER_MAP[graph_type] # graph_type is a literal of graph template in c++ side if graph_type == graph_def_pb2.ARROW_PROPERTY: # in a format of full qualified name, e.g. # vineyard::ArrowFragment<int64_t, uin64_t, vineyard::ArrowLocalVertexMap<int64_t, uint64_t>, false> graph_fqn = f"{graph_class}<{oid_type()},{vid_type()},{vertex_map_type()},{compact_edges()}>" elif graph_type == graph_def_pb2.ARROW_PROJECTED: # gs::ArrowProjectedFragment<int64_t, uint64_t, double, double,vineyard::ArrowLocalVertexMap<int64_t, uint64_t>, false> graph_fqn = f"{graph_class}<{oid_type()},{vid_type()},{vdata_type()},{edata_type()},{vertex_map_type()},{compact_edges()}>" # noqa: E501 elif graph_type == graph_def_pb2.IMMUTABLE_EDGECUT: # grape::ImmutableEdgecutFragment<int64_t, uint32_t, double, double> graph_fqn = ( f"{graph_class}<{oid_type()},{vid_type()},{vdata_type()},{edata_type()}>" ) elif graph_type == graph_def_pb2.ARROW_FLATTENED: # grape::ArrowFlattenFragment<int64_t, uint32_t, double, double> graph_fqn = ( f"{graph_class}<{oid_type()},{vid_type()},{vdata_type()},{edata_type()}>" ) elif graph_type == graph_def_pb2.DYNAMIC_PROJECTED: # gs::DynamicProjectedFragment<double, double> graph_fqn = f"{graph_class}<{vdata_type()},{edata_type()}>" else: raise ValueError( f"Unknown graph type: {graph_def_pb2.GraphTypePb.Name(graph_type)}" ) return graph_header, graph_fqn, oid_type(), vid_type() def create_single_op_dag(op_type, config=None): op_def = op_def_pb2.OpDef(op=op_type, key=uuid.uuid4().hex) if config: for k, v in config.items(): op_def.attr[k].CopyFrom(v) dag = op_def_pb2.DagDef() dag.op.extend([op_def]) return dag def dump_as_json(schema, path): out = {} items = [] idx = 0 for i, vertex_label in enumerate(schema.vertex_labels): vertex = { "id": idx, "label": vertex_label, "type": "VERTEX", "propertyDefList": [], } for j, value in enumerate(schema.vertex_property_names[i].s): names = schema.vertex_property_names[i] types = schema.vertex_property_types[i] vertex["propertyDefList"].append( {"id": j, "name": names.s[j], "data_type": types.s[j].upper()} ) vertex["indexes"] = [] vertex["indexes"].append({"propertyNames": [names.s[0]]}) items.append(vertex) idx += 1 for i, edge_label in enumerate(schema.edge_labels): edge = {"id": idx, "label": edge_label, "type": "EDGE", "propertyDefList": []} for j, value in enumerate(schema.edge_property_names[i].s): names = schema.edge_property_names[i] types = schema.edge_property_types[i] edge["propertyDefList"].append( {"id": j, "name": names.s[j], "data_type": types.s[j].upper()} ) edge["rawRelationShips"] = [] edge["rawRelationShips"].append( {"srcVertexLabel": "xx", "dstVertexLabel": "xx"} ) idx += 1 items.append(edge) out["types"] = items out["partitionNum"] = 4 with open(path, "w") as fp: json.dump(out, fp) def dump_string(schema_string, path): with open(path, "w") as fp: fp.write(schema_string) def parse_readable_memory(value): value = str(value).strip() num = value[:-2] suffix = value[-2:] try: float(num) except ValueError as e: raise ValueError(f"Argument cannot be interpreted as a number: {value}") from e if suffix not in ["Ki", "Mi", "Gi"]: raise ValueError(f"Memory suffix must be one of 'Ki', 'Mi' and 'Gi': {value}") return value def parse_as_glog_level(log_level): # log level in glog: INFO=1, DEBUG=10 # log level in python: DEBUG=10, INFO=20 if isinstance(log_level, str): if log_level == "silent" or log_level == "SILENT": log_level = -1 else: log_level = getattr(logging, log_level.upper()) python_to_glog = {0: 100, 10: 10, 20: 1} return python_to_glog.get(log_level, 0) def str2bool(s): if isinstance(s, bool): return s if s.lower() in ("yes", "true", "t", "y", "1"): return True return False class ResolveMPICmdPrefix(object): """ Class for resolving prefix of mpi command. Examples: .. code:: ipython >>> # openmpi found >>> rmcp = ResolveMPICmdPrefix() >>> (cmd, env) = rmcp.resolve(4, 'h1,h2,h3') >>> cmd ['mpirun', '--allow-run-as-root', '-n', '4', '-host', 'h1:2,h2:1,h3:1'] >>> env {'e': '/usr/local/bin/kube_ssh', # if kube_ssh in $PATH 'OMPI_MCA_btl_vader_single_copy_mechanism': 'none', 'OMPI_MCA_orte_allowed_exit_without_sync': '1'} >>> # if openmpi not found, change to mpich >>> rmcp = ResolveMPICmdPrefix() >>> (cmd, env) = rmcp.resolve(4, 'h1,h2,h3') >>> cmd ['mpirun', '-n', '4', '-host', 'h1:2,h2:1,h3:1'] >>> env {} # always empty >>> # run without mpi on localhost when setting `num_workers` to 1 >>> rmcp = ResolveMPICmdPrefix() >>> (cmd, env) = rmcp.resolve(1, 'localhost') >>> cmd [] >>> env {} """ _OPENMPI_RSH_AGENT = "OMPI_MCA_plm_rsh_agent" _KUBE_SSH_EXEC = "kube_ssh" def __init__(self, rsh_agent=False): self._rsh_agent = rsh_agent @staticmethod def openmpi(): ompi_info = "" if "OPAL_PREFIX" in os.environ: ompi_info = os.path.expandvars("$OPAL_PREFIX/bin/ompi_info") if not ompi_info: if "OPAL_BINDIR" in os.environ: ompi_info = os.path.expandvars("$OPAL_BINDIR/ompi_info") if not ompi_info: ompi_info = "ompi_info" try: subprocess.check_call([ompi_info], stdout=subprocess.DEVNULL) except FileNotFoundError: return False return True @staticmethod def alloc(num_workers, hosts): length = len(hosts) assert length != 0 proc_num = {} if num_workers >= length: quotient = num_workers / length residue = num_workers % length for host in hosts: if residue > 0: proc_num[host] = quotient + 1 residue -= 1 else: proc_num[host] = quotient else: raise RuntimeError("The number of hosts less then num_workers") return ",".join([f"{host}:{proc_num[host]}" for host in hosts]) @staticmethod def find_mpi(): mpi = None if ResolveMPICmdPrefix.openmpi(): if "OPAL_PREFIX" in os.environ: mpi = os.path.expandvars("$OPAL_PREFIX/bin/mpirun") if mpi is None: if "OPAL_BINDIR" in os.environ: mpi = os.path.expandvars("$OPAL_BINDIR/mpirun") if mpi is None: mpi = shutil.which("mpirun") if mpi is None: raise RuntimeError("mpirun command not found.") return mpi def resolve(self, num_workers: int, hosts: List[str]): cmd = [] env = {} if num_workers == 1 and hosts[0] in ("localhost", "127.0.0.1"): # run without mpi on localhost if workers num is 1 if shutil.which("ssh") is None: # also need a fake ssh agent env[self._OPENMPI_RSH_AGENT] = sys.executable return cmd, env if self.openmpi(): env["OMPI_MCA_btl_vader_single_copy_mechanism"] = "none" env["OMPI_MCA_orte_allowed_exit_without_sync"] = "1" # OMPI sends SIGCONT -> SIGTERM -> SIGKILL to the worker process, # set the following MCA parameter to zero will eliminate the chances # where the process dies before receiving the SIGTERM and do cleanup. env["OMPI_MCA_odls_base_sigkill_timeout"] = "0" if os.environ.get(self._OPENMPI_RSH_AGENT) is None: rsh_agent_path = shutil.which(self._KUBE_SSH_EXEC) if self._rsh_agent and rsh_agent_path is not None: env[self._OPENMPI_RSH_AGENT] = rsh_agent_path cmd.extend( [ self.find_mpi(), "--allow-run-as-root", "--bind-to", "none", ] ) else: # ssh agent supported only cmd.extend([self.find_mpi()]) cmd.extend(["-n", str(num_workers)]) cmd.extend(["-host", self.alloc(num_workers, hosts)]) logger.debug("Resolve mpi cmd prefix: %s", " ".join(cmd)) logger.debug("Resolve mpi env: %s", json.dumps(env)) return cmd, env # In Analytical engine, assume label ids of vertex entries are continuous # from zero, and property ids of each label is also continuous from zero. # When transform schema to interactive engine style, we gather all property names and # unique them, assign each name a id (index of the vector), then preserve a # vector<int> for each label, stores mappings from original id to transformed # id. def to_interactive_engine_schema(gsa_schema_json): gsa_schema = json.loads(gsa_schema_json) prop_set = set() vertex_label_num = 0 for item in gsa_schema["types"]: item["id"] = int(item["id"]) if item["type"] == "VERTEX": vertex_label_num += 1 for prop in item["propertyDefList"]: prop["id"] = int(prop["id"]) prop_set.add(prop["name"]) prop_list = sorted(list(prop_set)) mg_schema = copy.deepcopy(gsa_schema) for item in mg_schema["types"]: if item["propertyDefList"] == "": item["propertyDefList"] = [] if item["type"] == "VERTEX": for prop in item["propertyDefList"]: prop["id"] = 1 + prop_list.index(prop["name"]) elif item["type"] == "EDGE": item["id"] = vertex_label_num + item["id"] for prop in item["propertyDefList"]: prop["id"] = 1 + prop_list.index(prop["name"]) return json.dumps(mg_schema) def check_argument(condition, message=None): if not condition: if message is None: message = "in '%s'" % inspect.stack()[1].code_context[0] raise ValueError(f"Check failed: {message}") def check_server_ready(endpoint, server="gremlin"): def _check_gremlin_task(endpoint): from gremlin_python.driver.client import Client if "MY_POD_NAME" in os.environ: # inner kubernetes env if endpoint == "localhost" or endpoint == "127.0.0.1": # now, used in macOS with docker-desktop kubernetes cluster, # which external ip is 'localhost' when service type is 'LoadBalancer' logger.info("In kubernetes env, gremlin server is ready.") return True try: client = Client(f"ws://{endpoint}/gremlin", "g") # May throw client.submit("g.V().limit(1)").all().result() logger.info("Gremlin server is ready.") finally: try: client.close() except: # noqa: E722 pass return True def _check_cypher_task(endpoint): from neo4j import GraphDatabase if "MY_POD_NAME" in os.environ: # inner kubernetes env if endpoint == "localhost" or endpoint == "127.0.0.1": logger.info("In kubernetes env, cypher server is ready.") return True try: logger.debug("Try to connect to cypher server.") driver = GraphDatabase.driver(f"neo4j://{endpoint}", auth=("", "")) # May throw driver.verify_connectivity() logger.info("Checked connectivity to cypher server.") finally: try: driver.close() except: # noqa: E722 pass return True executor = ThreadPoolExecutor(max_workers=20) begin_time = time.time() while True: if server == "gremlin": t = executor.submit(_check_gremlin_task, endpoint) elif server == "cypher": t = executor.submit(_check_cypher_task, endpoint) else: raise ValueError( f"Unsupported server type: {server} other than 'gremlin' or 'cypher'" ) try: _ = t.result(timeout=30) except Exception as e: t.cancel() error_message = str(e) else: executor.shutdown(wait=False) return True time.sleep(3) if time.time() - begin_time > INTERACTIVE_INSTANCE_TIMEOUT_SECONDS: executor.shutdown(wait=False) raise TimeoutError( f"{server.capitalize()} check query failed: {error_message}" ) def replace_string_in_dict(dict_obj, old, new): if isinstance(dict_obj, dict): for key, value in dict_obj.items(): dict_obj[key] = replace_string_in_dict(value, old, new) elif isinstance(dict_obj, list): for index, item in enumerate(dict_obj): dict_obj[index] = replace_string_in_dict(item, old, new) elif isinstance(dict_obj, str): return dict_obj.replace(old, new) return dict_obj # Reference: https://gist.github.com/beugley/ccd69945346759eb6142272a6d69b4e0 def human_readable_to_bytes(size): """Given a human-readable byte string (e.g. 2G, 30M, 20K), return the number of bytes. Will raise an exception if the argument has unexpected form. """ # Try to parse the size as if the unit was coded on 1 char. try: numeric_size = float(size[:-1]) unit = size[-1] except ValueError: try: # Try to parse the size as if the unit was coded on 2 chars. numeric_size = float(size[:-2]) unit = size[-2:-1] except ValueError: raise ValueError("Can't convert %r to bytes" % size) unit = unit.upper() # Now we have a numeric value and a unit. Check the unit and convert to bytes. if unit == "G": bytes = numeric_size * 1073741824 elif unit == "M": bytes = numeric_size * 1048576 elif unit == "K": bytes = numeric_size * 1024 else: bytes = numeric_size return int(bytes)