analytical_engine/core/grape_instance.cc (1,352 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "core/grape_instance.h" #include <cstdint> #include <memory> #include <ostream> #include <utility> #include <vector> #include "arrow/array/array_base.h" #include "arrow/table.h" #include "arrow/type.h" #include "boost/leaf/error.hpp" #include "boost/leaf/result.hpp" #include "grape/communication/communicator.h" #include "grape/parallel/parallel_engine_spec.h" #include "grape/serialization/in_archive.h" #include "grape/worker/comm_spec.h" #include "vineyard/client/client.h" #include "vineyard/client/ds/i_object.h" #include "vineyard/common/util/status.h" #include "vineyard/common/util/uuid.h" #include "vineyard/graph/fragment/arrow_fragment_group.h" #include "vineyard/graph/loader/fragment_loader_utils.h" #include "vineyard/graph/utils/error.h" #include "vineyard/io/io/io_factory.h" #ifdef ENABLE_JAVA_SDK #include "core/context/java_pie_projected_context.h" #include "core/context/java_pie_property_context.h" #endif #ifdef NETWORKX #include "core/object/dynamic.h" #endif #include "core/context/i_context.h" #include "core/context/labeled_vertex_property_context.h" #include "core/context/selector.h" #include "core/context/tensor_context.h" #include "core/context/vertex_data_context.h" #include "core/context/vertex_property_context.h" #include "core/fragment/dynamic_fragment.h" #include "core/io/property_parser.h" #include "core/launcher.h" #include "core/object/app_entry.h" #include "core/object/fragment_wrapper.h" #include "core/object/graph_utils.h" #include "core/object/i_fragment_wrapper.h" #include "core/object/projector.h" #include "core/server/command_detail.h" #include "core/server/rpc_utils.h" #include "core/utils/mpi_utils.h" #include "proto/attr_value.pb.h" #include "proto/graph_def.pb.h" #include "proto/types.pb.h" namespace bl = boost::leaf; namespace gs { namespace rpc { class QueryArgs; } // namespace rpc GrapeInstance::GrapeInstance(const grape::CommSpec& comm_spec) : comm_spec_(comm_spec) {} void GrapeInstance::Init(const std::string& vineyard_socket) { // force link vineyard_io library for graph/app compilation vineyard::IOFactory::Init(); EnsureClient(client_, vineyard_socket); if (comm_spec_.worker_id() == grape::kCoordinatorRank) { VLOG(1) << "Workers of grape-engine initialized."; } } bl::result<rpc::graph::GraphDefPb> GrapeInstance::loadGraph( const rpc::GSParams& params) { std::string graph_name = "graph_" + generateId(); BOOST_LEAF_AUTO(graph_type, params.Get<rpc::graph::GraphTypePb>(rpc::GRAPH_TYPE)); switch (graph_type) { case rpc::graph::DYNAMIC_PROPERTY: { #ifdef NETWORKX using fragment_t = DynamicFragment; using vertex_map_t = typename fragment_t::vertex_map_t; BOOST_LEAF_AUTO(directed, params.Get<bool>(rpc::DIRECTED)); VLOG(1) << "Loading graph, graph name: " << graph_name << ", graph type: DynamicFragment, directed: " << directed; auto vm_ptr = std::shared_ptr<vertex_map_t>(new vertex_map_t(comm_spec_)); vm_ptr->Init(); typename vertex_map_t::partitioner_t partitioner(comm_spec_.fnum()); vm_ptr->SetPartitioner(partitioner); auto fragment = std::make_shared<fragment_t>(vm_ptr); fragment->Init(comm_spec_.fid(), directed); rpc::graph::GraphDefPb graph_def; graph_def.set_key(graph_name); graph_def.set_directed(directed); graph_def.set_graph_type(rpc::graph::DYNAMIC_PROPERTY); graph_def.set_compact_edges(false); // dynamic graph doesn't have a vineyard id gs::rpc::graph::MutableGraphInfoPb graph_info; if (graph_def.has_extension()) { graph_def.extension().UnpackTo(&graph_info); } graph_info.set_property_schema_json( dynamic::Stringify(fragment->GetSchema())); graph_def.mutable_extension()->PackFrom(graph_info); auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>( graph_name, graph_def, fragment); BOOST_LEAF_CHECK(object_manager_.PutObject(wrapper)); return graph_def; #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif } case rpc::graph::ARROW_PROPERTY: { BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE)); VLOG(1) << "Loading graph, graph name: " << graph_name << ", graph type: ArrowFragment, type sig: " << type_sig; #ifdef ENABLE_JAVA_SDK // It is possible to has no JAVA_CLASS_PATH and JVM_OPTS when we loading // graph via addLabels, e.t.c. if (params.HasKey(rpc::JAVA_CLASS_PATH)) { BOOST_LEAF_AUTO(user_jar_path, params.Get<std::string>(rpc::JAVA_CLASS_PATH)); setenv("USER_JAR_PATH", user_jar_path.c_str(), true); VLOG(10) << "USER_JAR_PATH: " << user_jar_path; } if (params.HasKey(rpc::JVM_OPTS)) { BOOST_LEAF_AUTO(jvm_opts, params.Get<std::string>(rpc::JVM_OPTS)); setenv("GRAPE_JVM_OPTS", jvm_opts.c_str(), true); VLOG(10) << "GRAPE_JVM_OPTS:" << jvm_opts; } #endif BOOST_LEAF_AUTO(graph_utils, object_manager_.GetObject<PropertyGraphUtils>(type_sig)); BOOST_LEAF_AUTO(wrapper, graph_utils->LoadGraph(comm_spec_, *client_, graph_name, params)); BOOST_LEAF_CHECK(object_manager_.PutObject(wrapper)); return wrapper->graph_def(); } default: RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "Unsupported graph type " + rpc::graph::GraphTypePb_Name(graph_type)); } } bl::result<void> GrapeInstance::unloadGraph(const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); if (params.HasKey(rpc::VINEYARD_ID)) { BOOST_LEAF_AUTO(frag_group_id, params.Get<int64_t>(rpc::VINEYARD_ID)); bool exists = false; VY_OK_OR_RAISE(client_->Exists(frag_group_id, exists)); if (exists) { std::shared_ptr<vineyard::ArrowFragmentGroup> fg; VY_OK_OR_RAISE(client_->GetObject(frag_group_id, fg)); auto fid = comm_spec_.WorkerToFrag(comm_spec_.worker_id()); auto frag_id = fg->Fragments().at(fid); // ensure all workers obtain the expected information MPI_Barrier(comm_spec_.comm()); // delete the fragment group first if (comm_spec_.worker_id() == 0) { #if defined(VINEYARD_VERSION) && VINEYARD_VERSION >= 21003 VINEYARD_SUPPRESS(client_->DelData(frag_group_id, false, true, true)); #else VINEYARD_SUPPRESS(client_->DelData(frag_group_id, false, true)); #endif } // ensure all fragments get deleted MPI_Barrier(comm_spec_.comm()); #if defined(VINEYARD_VERSION) && VINEYARD_VERSION >= 21003 VINEYARD_SUPPRESS(client_->DelData(frag_id, false, true, true)); #else VINEYARD_SUPPRESS(client_->DelData(frag_id, false, true)); #endif } } VLOG(1) << "Unloading Graph " << graph_name; return object_manager_.RemoveObject(graph_name); } bl::result<void> GrapeInstance::archiveGraph(const rpc::GSParams& params) { if (params.HasKey(rpc::VINEYARD_ID)) { BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE)); BOOST_LEAF_AUTO(frag_group_id, params.Get<int64_t>(rpc::VINEYARD_ID)); BOOST_LEAF_AUTO(graph_utils, object_manager_.GetObject<PropertyGraphUtils>(type_sig)); bool exists = false; VY_OK_OR_RAISE(client_->Exists(frag_group_id, exists)); if (exists) { BOOST_LEAF_CHECK(graph_utils->ArchiveGraph(frag_group_id, comm_spec_, *client_, params)); } else { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Fragment group " + std::to_string(frag_group_id) + " does not exist"); } } return {}; } bl::result<std::string> GrapeInstance::loadApp(const rpc::GSParams& params) { BOOST_LEAF_AUTO(algo_name, params.Get<std::string>(rpc::APP_ALGO)); std::string app_name = "app_" + algo_name + "_" + generateId(); BOOST_LEAF_AUTO(lib_path, params.Get<std::string>(rpc::APP_LIBRARY_PATH)); auto app = std::make_shared<AppEntry>(app_name, lib_path); VLOG(1) << "Loading application, application name: " << app_name << " , library path: " << lib_path; BOOST_LEAF_CHECK(app->Init()); BOOST_LEAF_CHECK(object_manager_.PutObject(app)); return app_name; } bl::result<void> GrapeInstance::unloadApp(const rpc::GSParams& params) { BOOST_LEAF_AUTO(app_name, params.Get<std::string>(rpc::APP_NAME)); return object_manager_.RemoveObject(app_name); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::projectGraph( const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(project_infos, gs::ParseProjectPropertyGraph(params)); BOOST_LEAF_AUTO( frag_wrapper, object_manager_.GetObject<ILabeledFragmentWrapper>(graph_name)); if (frag_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "projectGraph is only available for ArrowFragment"); } std::string dst_graph_name = "graph_" + generateId(); BOOST_LEAF_AUTO(new_frag_wrapper, frag_wrapper->Project(comm_spec_, dst_graph_name, project_infos[0], project_infos[1])); BOOST_LEAF_CHECK(object_manager_.PutObject(new_frag_wrapper)); return new_frag_wrapper->graph_def(); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::projectToSimple( const rpc::GSParams& params) { std::string projected_graph_name = "graph_projected_" + generateId(); BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE)); VLOG(1) << "Projecting graph " << graph_name << " to simple graph: " << projected_graph_name << ", type sig: " << type_sig; BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); BOOST_LEAF_AUTO(projector, object_manager_.GetObject<Projector>(type_sig)); BOOST_LEAF_AUTO(projected_wrapper, projector->Project(wrapper, projected_graph_name, params)); BOOST_LEAF_CHECK(object_manager_.PutObject(projected_wrapper)); auto graph_def = projected_wrapper->graph_def(); if (!graph_def.has_extension()) { return graph_def; } gs::rpc::graph::VineyardInfoPb vy_info; // gather fragment id graph_def.extension().UnpackTo(&vy_info); if (vy_info.vineyard_id() == 0) { return graph_def; } VY_OK_OR_RAISE(client_->Persist(vy_info.vineyard_id())); // construct fragment group BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup( *client_, vy_info.vineyard_id(), comm_spec_)); // return graph def with vineyard id attached gs::rpc::graph::VineyardInfoPb new_vy_info = vy_info; new_vy_info.set_vineyard_id(frag_group_id); graph_def.mutable_extension()->PackFrom(new_vy_info); return graph_def; } bl::result<std::string> GrapeInstance::query(const rpc::GSParams& params, const rpc::QueryArgs& query_args) { BOOST_LEAF_AUTO(app_name, params.Get<std::string>(rpc::APP_NAME)); BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(app, object_manager_.GetObject<AppEntry>(app_name)); BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); VLOG(1) << "Query app, application name: " << app_name << ", graph name: " << graph_name; auto fragment = wrapper->fragment(); auto spec = grape::DefaultParallelEngineSpec(); std::string context_key = "ctx_" + generateId(); BOOST_LEAF_AUTO(worker, app->CreateWorker(fragment, comm_spec_, spec)); BOOST_LEAF_AUTO(ctx_wrapper, app->Query(worker.get(), query_args, context_key, wrapper)); std::string context_type; std::string context_schema; if (ctx_wrapper == nullptr) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Query returns a null context wrapper without useful error message"); } else { context_type = ctx_wrapper->context_type(); context_schema = ctx_wrapper->schema(); BOOST_LEAF_CHECK(object_manager_.PutObject(ctx_wrapper)); } return toJson({{"context_type", context_type}, {"context_key", context_key}, {"context_schema", context_schema}}); } bl::result<void> GrapeInstance::unloadContext(const rpc::GSParams& params) { BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY)); return object_manager_.RemoveObject(context_key); } bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::reportGraph( const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); return wrapper->ReportGraph(comm_spec_, params); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::modifyVertices( const rpc::GSParams& params) { #ifdef NETWORKX BOOST_LEAF_AUTO(modify_type, params.Get<rpc::ModifyType>(rpc::MODIFY_TYPE)); BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); auto& graph_def = wrapper->mutable_graph_def(); auto graph_type = graph_def.graph_type(); if (graph_type != rpc::graph::DYNAMIC_PROPERTY) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "GraphType must be DYNAMIC_PROPERTY, the origin graph type is: " + rpc::graph::GraphTypePb_Name(graph_type) + ", graph id: " + graph_name); } BOOST_LEAF_AUTO(common_attr_json, params.Get<std::string>(rpc::PROPERTIES)); dynamic::Value common_attr, nodes; // the common attribute for all nodes to be modified dynamic::Parse(common_attr_json, common_attr); std::string nodes_json = params.GetLargeAttr().chunk_list().items()[0].buffer(); dynamic::Parse(nodes_json, nodes); auto fragment = std::static_pointer_cast<DynamicFragment>(wrapper->fragment()); DynamicFragmentMutator mutator(comm_spec_, fragment); mutator.ModifyVertices(nodes, common_attr, modify_type); // update schema in graph_def gs::rpc::graph::MutableGraphInfoPb graph_info; if (graph_def.has_extension()) { graph_def.extension().UnpackTo(&graph_info); } graph_info.set_property_schema_json( dynamic::Stringify(fragment->GetSchema())); graph_def.mutable_extension()->PackFrom(graph_info); return graph_def; #else RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod, "GraphScope is built with NETWORKX=OFF, please recompile it " "with NETWORKX=ON"); #endif } bl::result<rpc::graph::GraphDefPb> GrapeInstance::modifyEdges( const rpc::GSParams& params) { #ifdef NETWORKX BOOST_LEAF_AUTO(modify_type, params.Get<rpc::ModifyType>(rpc::MODIFY_TYPE)); BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); auto& graph_def = wrapper->mutable_graph_def(); auto graph_type = graph_def.graph_type(); if (graph_type != rpc::graph::DYNAMIC_PROPERTY) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "GraphType must be DYNAMIC_PROPERTY, the origin graph type is: " + std::to_string(graph_type) + ", graph name: " + graph_name); } BOOST_LEAF_AUTO(common_attr_json, params.Get<std::string>(rpc::PROPERTIES)); dynamic::Value common_attr, edges; // the common attribute for all edges to be modified dynamic::Parse(common_attr_json, common_attr); std::string weight = ""; if (params.HasKey(rpc::EDGE_KEY)) { BOOST_LEAF_AUTO(weight, params.Get<std::string>(rpc::EDGE_KEY)); } std::string edges_json = params.GetLargeAttr().chunk_list().items()[0].buffer(); dynamic::Parse(edges_json, edges); auto fragment = std::static_pointer_cast<DynamicFragment>(wrapper->fragment()); DynamicFragmentMutator mutator(comm_spec_, fragment); mutator.ModifyEdges(edges, common_attr, modify_type, weight); // update schema in graph_def gs::rpc::graph::MutableGraphInfoPb graph_info; if (graph_def.has_extension()) { graph_def.extension().UnpackTo(&graph_info); } graph_info.set_property_schema_json( dynamic::Stringify(fragment->GetSchema())); graph_def.mutable_extension()->PackFrom(graph_info); return graph_def; #else RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod, "GraphScope is built with NETWORKX=OFF, please recompile it " "with NETWORKX=ON"); #endif // NETWORKX } bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToNumpy( const rpc::GSParams& params) { std::string s_selector; std::pair<std::string, std::string> range; std::shared_ptr<IContextWrapper> base_ctx_wrapper; BOOST_LEAF_CHECK( getContextDetails(params, &s_selector, &range, &base_ctx_wrapper)); auto ctx_type = base_ctx_wrapper->context_type(); if (ctx_type == CONTEXT_TYPE_TENSOR) { auto wrapper = std::dynamic_pointer_cast<ITensorContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(axis, params.Get<int64_t>(rpc::AXIS)); return wrapper->ToNdArray(comm_spec_, axis); } else if (ctx_type == CONTEXT_TYPE_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(selector, Selector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexDataContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); } else if (ctx_type == CONTEXT_TYPE_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<IVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, Selector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); #ifdef ENABLE_JAVA_SDK } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java property context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "Unsupported java projected context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEProjectedContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, Selector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); #endif } RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Unsupported context type: " + std::string(ctx_type)); } bl::result<std::string> GrapeInstance::getContextData( const rpc::GSParams& params) { BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY)); BOOST_LEAF_AUTO(base_ctx_wrapper, object_manager_.GetObject<IContextWrapper>(context_key)); auto wrapper = std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper); return wrapper->GetContextData(params); } bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToDataframe( const rpc::GSParams& params) { std::string s_selector; std::pair<std::string, std::string> range; std::shared_ptr<IContextWrapper> base_ctx_wrapper; BOOST_LEAF_CHECK( getContextDetails(params, &s_selector, &range, &base_ctx_wrapper)); auto ctx_type = base_ctx_wrapper->context_type(); if (ctx_type == CONTEXT_TYPE_TENSOR) { auto wrapper = std::dynamic_pointer_cast<ITensorContextWrapper>(base_ctx_wrapper); return wrapper->ToDataframe(comm_spec_); } else if (ctx_type == CONTEXT_TYPE_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexDataContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); } else if (ctx_type == CONTEXT_TYPE_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<IVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); #ifdef ENABLE_JAVA_SDK } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java property context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "Unsupported java projected context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEProjectedContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); #endif } RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Unsupported context type: " + std::string(ctx_type)); } bl::result<std::string> GrapeInstance::contextToVineyardTensor( const rpc::GSParams& params) { std::string s_selector; std::pair<std::string, std::string> range; std::shared_ptr<IContextWrapper> base_ctx_wrapper; BOOST_LEAF_CHECK( getContextDetails(params, &s_selector, &range, &base_ctx_wrapper)); auto ctx_type = base_ctx_wrapper->context_type(); vineyard::ObjectID id; if (ctx_type == CONTEXT_TYPE_TENSOR) { auto wrapper = std::dynamic_pointer_cast<ITensorContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(axis, params.Get<int64_t>(rpc::AXIS)); BOOST_LEAF_ASSIGN(id, wrapper->ToVineyardTensor(comm_spec_, *client_, axis)); } else if (ctx_type == CONTEXT_TYPE_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(selector, Selector::parse(s_selector)); BOOST_LEAF_ASSIGN( id, wrapper->ToVineyardTensor(comm_spec_, *client_, selector, range)); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexDataContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); BOOST_LEAF_ASSIGN( id, wrapper->ToVineyardTensor(comm_spec_, *client_, selector, range)); } else if (ctx_type == CONTEXT_TYPE_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<IVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, Selector::parse(s_selector)); BOOST_LEAF_ASSIGN( id, wrapper->ToVineyardTensor(comm_spec_, *client_, selector, range)); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); BOOST_LEAF_ASSIGN( id, wrapper->ToVineyardTensor(comm_spec_, *client_, selector, range)); #ifdef ENABLE_JAVA_SDK } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java property context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); BOOST_LEAF_ASSIGN( id, wrapper->ToVineyardTensor(comm_spec_, *client_, selector, range)); } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java projected context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEProjectedContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selector, Selector::parse(s_selector)); BOOST_LEAF_ASSIGN( id, wrapper->ToVineyardTensor(comm_spec_, *client_, selector, range)); #endif } else { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Unsupported context type: " + std::string(ctx_type)); } auto s_id = vineyard::ObjectIDToString(id); VY_OK_OR_RAISE(client_->PutName(id, s_id)); return toJson({{"object_id", s_id}}); } bl::result<std::string> GrapeInstance::contextToVineyardDataFrame( const rpc::GSParams& params) { std::string s_selector; std::pair<std::string, std::string> range; std::shared_ptr<IContextWrapper> base_ctx_wrapper; BOOST_LEAF_CHECK( getContextDetails(params, &s_selector, &range, &base_ctx_wrapper)); vineyard::ObjectID id; auto ctx_type = base_ctx_wrapper->context_type(); if (ctx_type == CONTEXT_TYPE_TENSOR) { auto wrapper = std::dynamic_pointer_cast<ITensorContextWrapper>(base_ctx_wrapper); BOOST_LEAF_ASSIGN(id, wrapper->ToVineyardDataframe(comm_spec_, *client_)); } else if (ctx_type == CONTEXT_TYPE_VERTEX_DATA) { auto vd_ctx_wrapper = std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(id, vd_ctx_wrapper->ToVineyardDataframe( comm_spec_, *client_, selectors, range)); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_DATA) { auto vd_ctx_wrapper = std::dynamic_pointer_cast<ILabeledVertexDataContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(id, vd_ctx_wrapper->ToVineyardDataframe( comm_spec_, *client_, selectors, range)); } else if (ctx_type == CONTEXT_TYPE_VERTEX_PROPERTY) { auto vd_ctx_wrapper = std::dynamic_pointer_cast<IVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(id, vd_ctx_wrapper->ToVineyardDataframe( comm_spec_, *client_, selectors, range)); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_PROPERTY) { auto vd_ctx_wrapper = std::dynamic_pointer_cast<ILabeledVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(id, vd_ctx_wrapper->ToVineyardDataframe( comm_spec_, *client_, selectors, range)); #ifdef ENABLE_JAVA_SDK } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java property context type: " + std::string(ctx_type)); } auto vd_ctx_wrapper = std::dynamic_pointer_cast<IJavaPIEPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(id, vd_ctx_wrapper->ToVineyardDataframe( comm_spec_, *client_, selectors, range)); } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java projected context type: " + std::string(ctx_type)); } auto vd_ctx_wrapper = std::dynamic_pointer_cast<IJavaPIEProjectedContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(id, vd_ctx_wrapper->ToVineyardDataframe( comm_spec_, *client_, selectors, range)); #endif } else { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Unsupported context type: " + std::string(ctx_type)); } auto s_id = vineyard::ObjectIDToString(id); VY_OK_OR_RAISE(client_->PutName(id, s_id)); return toJson({{"object_id", s_id}}); } bl::result<void> GrapeInstance::outputContext(const rpc::GSParams& params) { std::string s_selector; std::pair<std::string, std::string> range; std::shared_ptr<IContextWrapper> base_ctx_wrapper; BOOST_LEAF_CHECK( getContextDetails(params, &s_selector, &range, &base_ctx_wrapper)); if (!range.first.empty() && !range.second.empty()) { LOG(WARNING) << "Specifying vertex range for output is not supported and ignored"; } BOOST_LEAF_AUTO(location, params.Get<std::string>(rpc::FD)); auto ctx_type = base_ctx_wrapper->context_type(); std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>> arrays; if (ctx_type == CONTEXT_TYPE_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(arrays, wrapper->ToArrowArrays(comm_spec_, selectors)); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_DATA) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexDataContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); BOOST_LEAF_AUTO(arrays_map, wrapper->ToArrowArrays(comm_spec_, selectors)); for (auto& pair : arrays_map) { arrays.insert(arrays.end(), pair.second.begin(), pair.second.end()); } } else if (ctx_type == CONTEXT_TYPE_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<IVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(arrays, wrapper->ToArrowArrays(comm_spec_, selectors)); } else if (ctx_type == CONTEXT_TYPE_LABELED_VERTEX_PROPERTY) { auto wrapper = std::dynamic_pointer_cast<ILabeledVertexPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); BOOST_LEAF_AUTO(arrays_map, wrapper->ToArrowArrays(comm_spec_, selectors)); for (auto& pair : arrays_map) { arrays.insert(arrays.end(), pair.second.begin(), pair.second.end()); } #ifdef ENABLE_JAVA_SDK } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kIllegalStateError, "Unsupported java property context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEPropertyContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); BOOST_LEAF_AUTO(arrays_map, wrapper->ToArrowArrays(comm_spec_, selectors)); for (auto& pair : arrays_map) { arrays.insert(arrays.end(), pair.second.begin(), pair.second.end()); } } else if (ctx_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) != std::string::npos) { std::vector<std::string> outer_and_inner; boost::split(outer_and_inner, ctx_type, boost::is_any_of(":")); if (outer_and_inner.size() != 2) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "Unsupported java projected context type: " + std::string(ctx_type)); } auto wrapper = std::dynamic_pointer_cast<IJavaPIEProjectedContextWrapper>( base_ctx_wrapper); BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selector)); BOOST_LEAF_ASSIGN(arrays, wrapper->ToArrowArrays(comm_spec_, selectors)); #endif } else { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Unsupported context type: " + std::string(ctx_type)); } std::vector<std::shared_ptr<arrow::Array>> arrays_vec; std::vector<std::shared_ptr<arrow::Field>> fields_vec; for (auto& pair : arrays) { arrays_vec.push_back(pair.second); auto field = std::make_shared<arrow::Field>(pair.first, pair.second->type()); fields_vec.push_back(field); } auto schema = std::make_shared<arrow::Schema>(fields_vec); auto table = arrow::Table::Make(schema, arrays_vec); VLOG(2) << "Output table schema: " << table->schema()->ToString(); auto io_adaptor = vineyard::IOFactory::CreateIOAdaptor(location); if (io_adaptor == nullptr) { RETURN_GS_ERROR(vineyard::ErrorCode::kIOError, "Cannot find a supported adaptor for " + location); } ARROW_OK_OR_RAISE(io_adaptor->Open("w")); ARROW_OK_OR_RAISE(io_adaptor->WriteTable(table)); ARROW_OK_OR_RAISE(io_adaptor->Close()); return {}; } bl::result<rpc::graph::GraphDefPb> GrapeInstance::addColumn( const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY)); BOOST_LEAF_AUTO(s_selector, params.Get<std::string>(rpc::SELECTOR)); BOOST_LEAF_AUTO( frag_wrapper, object_manager_.GetObject<ILabeledFragmentWrapper>(graph_name)); if (frag_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "AddColumn is only available for ArrowFragment"); } BOOST_LEAF_AUTO(ctx_wrapper, object_manager_.GetObject<IContextWrapper>(context_key)); std::string dst_graph_name = "graph_" + generateId(); BOOST_LEAF_AUTO(new_frag_wrapper, frag_wrapper->AddColumn(comm_spec_, dst_graph_name, ctx_wrapper, s_selector)); BOOST_LEAF_CHECK(object_manager_.PutObject(new_frag_wrapper)); return new_frag_wrapper->graph_def(); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::convertGraph( const rpc::GSParams& params) { BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(dst_graph_type, params.Get<rpc::graph::GraphTypePb>(rpc::DST_GRAPH_TYPE)); BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE)); std::string dst_graph_name = "graph_" + generateId(); VLOG(1) << "Converting graph, src graph name: " << src_graph_name << ", dst graph name: " << dst_graph_name << ", dst graph type: " << rpc::graph::GraphTypePb_Name(dst_graph_type) << ", type_sig: " << type_sig; BOOST_LEAF_AUTO(g_utils, object_manager_.GetObject<PropertyGraphUtils>(type_sig)); BOOST_LEAF_AUTO(src_frag_wrapper, object_manager_.GetObject<IFragmentWrapper>(src_graph_name)); auto src_graph_type = src_frag_wrapper->graph_def().graph_type(); if (src_graph_type == rpc::graph::ARROW_PROPERTY) { BOOST_LEAF_AUTO(default_label_id, params.Get<int64_t>(rpc::DEFAULT_LABEL_ID)); BOOST_LEAF_AUTO(dst_graph_wrapper, g_utils->ToDynamicFragment( comm_spec_, src_frag_wrapper, dst_graph_name, default_label_id)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_graph_wrapper)); return dst_graph_wrapper->graph_def(); } else if (src_graph_type == rpc::graph::DYNAMIC_PROPERTY) { BOOST_LEAF_AUTO(dst_graph_wrapper, g_utils->ToArrowFragment(*client_, comm_spec_, src_frag_wrapper, dst_graph_name)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_graph_wrapper)); return dst_graph_wrapper->graph_def(); } RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "Unsupported conversion direction from " + rpc::graph::GraphTypePb_Name(src_graph_type) + " to " + rpc::graph::GraphTypePb_Name(dst_graph_type)); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::copyGraph( const rpc::GSParams& params) { BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(copy_type, params.Get<std::string>(rpc::COPY_TYPE)); BOOST_LEAF_AUTO(src_wrapper, object_manager_.GetObject<IFragmentWrapper>(src_graph_name)); std::string dst_graph_name = "graph_" + generateId(); VLOG(1) << "Copy graph from " << src_graph_name << ", graph name: " << dst_graph_name; BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->CopyGraph( comm_spec_, dst_graph_name, copy_type)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); return dst_wrapper->graph_def(); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::toDirected( const rpc::GSParams& params) { BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(src_wrapper, object_manager_.GetObject<IFragmentWrapper>(src_graph_name)); std::string dst_graph_name = "graph_" + generateId(); VLOG(1) << "Convert to directed graph from " << src_graph_name << ", graph name: " << dst_graph_name; BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->ToDirected(comm_spec_, dst_graph_name)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); return dst_wrapper->graph_def(); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::toUnDirected( const rpc::GSParams& params) { BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(src_wrapper, object_manager_.GetObject<IFragmentWrapper>(src_graph_name)); std::string dst_graph_name = "graph_" + generateId(); VLOG(1) << "Convert to undirected graph from " << src_graph_name << ", graph name: " << dst_graph_name; BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->ToUndirected(comm_spec_, dst_graph_name)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); return dst_wrapper->graph_def(); } #ifdef NETWORKX bl::result<rpc::graph::GraphDefPb> GrapeInstance::induceSubGraph( const rpc::GSParams& params) { BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(src_wrapper, object_manager_.GetObject<IFragmentWrapper>(src_graph_name)); std::string sub_graph_name = "graph_" + generateId(); VLOG(1) << "Inducing subgraph from " << src_graph_name << ", graph name: " << sub_graph_name; std::vector<dynamic::Value> induced_vertices; std::vector<std::pair<dynamic::Value, dynamic::Value>> induced_edges; if (params.HasKey(rpc::NODES)) { // induce subgraph from nodes. BOOST_LEAF_AUTO(nodes_json, params.Get<std::string>(rpc::NODES)); dynamic::Value nodes; dynamic::Parse(nodes_json, nodes); induced_vertices.reserve(nodes.Size()); for (auto& v : nodes) { induced_vertices.push_back(dynamic::Value(v)); } } else if (params.HasKey(rpc::EDGES)) { // induce subgraph from edges. BOOST_LEAF_AUTO(edges_json, params.Get<std::string>(rpc::EDGES)); dynamic::Value edges; dynamic::Parse(edges_json, edges); induced_edges.reserve(edges.Size()); for (const auto& e : edges) { induced_vertices.push_back(dynamic::Value(e[0])); induced_vertices.push_back(dynamic::Value(e[1])); induced_edges.emplace_back(std::move(e[0]), std::move(e[1])); } } auto fragment = std::static_pointer_cast<DynamicFragment>(src_wrapper->fragment()); auto sub_vm_ptr = std::make_shared<typename DynamicFragment::vertex_map_t>(comm_spec_); sub_vm_ptr->Init(); { typename DynamicFragment::vertex_map_t::partitioner_t partitioner( comm_spec_.fnum()); sub_vm_ptr->SetPartitioner(partitioner); } grape::Communicator comm; comm.InitCommunicator(comm_spec_.comm()); typename DynamicFragment::vid_t gid; for (const auto& v : induced_vertices) { bool alive_in_frag = fragment->HasNode(v); bool alive = false; comm.Sum(alive_in_frag, alive); if (alive) { sub_vm_ptr->AddVertex(v, gid); } } auto sub_graph_def = src_wrapper->graph_def(); sub_graph_def.set_key(sub_graph_name); auto sub_frag = std::make_shared<DynamicFragment>(sub_vm_ptr); sub_frag->InduceSubgraph(fragment, induced_vertices, induced_edges); gs::rpc::graph::MutableGraphInfoPb graph_info; if (sub_graph_def.has_extension()) { sub_graph_def.extension().UnpackTo(&graph_info); } graph_info.set_property_schema_json( dynamic::Stringify(sub_frag->GetSchema())); sub_graph_def.mutable_extension()->PackFrom(graph_info); auto wrapper = std::make_shared<FragmentWrapper<DynamicFragment>>( sub_graph_name, sub_graph_def, sub_frag); BOOST_LEAF_CHECK(object_manager_.PutObject(wrapper)); return wrapper->graph_def(); } #endif // NETWORKX bl::result<void> GrapeInstance::clearGraph(const rpc::GSParams& params) { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); auto graph_type = wrapper->graph_def().graph_type(); if (graph_type != rpc::graph::DYNAMIC_PROPERTY) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "GraphType must be DYNAMIC_PROPERTY, the origin graph type is: " + rpc::graph::GraphTypePb_Name(graph_type) + ", graph id: " + graph_name); } auto vm_ptr = std::shared_ptr<DynamicFragment::vertex_map_t>( new DynamicFragment::vertex_map_t(comm_spec_)); vm_ptr->Init(); { typename DynamicFragment::vertex_map_t::partitioner_t partitioner( comm_spec_.fnum()); vm_ptr->SetPartitioner(partitioner); } auto fragment = std::static_pointer_cast<DynamicFragment>(wrapper->fragment()); fragment->ClearGraph(vm_ptr); #else RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod, "GraphScope is built with NETWORKX=OFF, please recompile it " "with NETWORKX=ON"); #endif // NETWORKX return {}; } bl::result<void> GrapeInstance::clearEdges(const rpc::GSParams& params) { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); auto graph_type = wrapper->graph_def().graph_type(); if (graph_type != rpc::graph::DYNAMIC_PROPERTY) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "GraphType must be DYNAMIC_PROPERTY, the origin graph type is: " + rpc::graph::GraphTypePb_Name(graph_type) + ", graph id: " + graph_name); } auto fragment = std::static_pointer_cast<DynamicFragment>(wrapper->fragment()); fragment->ClearEdges(); #else RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod, "GraphScope is built with NETWORKX=OFF, please recompile it " "with NETWORKX=ON"); #endif // NETWORKX return {}; } bl::result<rpc::graph::GraphDefPb> GrapeInstance::createGraphView( const rpc::GSParams& params) { #ifdef NETWORKX std::string view_id = "graph_view_" + generateId(); BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(view_type, params.Get<std::string>(rpc::VIEW_TYPE)); VLOG(1) << "Creating graph view, dst graph name: " << view_id << ", view type: " << view_type; BOOST_LEAF_AUTO(wrapper, object_manager_.GetObject<IFragmentWrapper>(graph_name)); BOOST_LEAF_AUTO(view_wrapper, wrapper->CreateGraphView(comm_spec_, view_id, view_type)); BOOST_LEAF_CHECK(object_manager_.PutObject(view_wrapper)); return view_wrapper->graph_def(); #else RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod, "GraphScope is built with NETWORKX=OFF, please recompile it " "with NETWORKX=ON"); #endif // NETWORKX } bl::result<rpc::graph::GraphDefPb> GrapeInstance::addLabelsToGraph( const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO( src_wrapper, object_manager_.GetObject<ILabeledFragmentWrapper>(graph_name)); if (src_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "AddLabelsToGraph is only available for ArrowFragment"); } auto src_frag_id = std::static_pointer_cast<vineyard::Object>(src_wrapper->fragment())->id(); BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE)); BOOST_LEAF_AUTO(graph_utils, object_manager_.GetObject<PropertyGraphUtils>(type_sig)); std::string dst_graph_name = "graph_" + generateId(); BOOST_LEAF_AUTO(dst_wrapper, graph_utils->AddLabelsToGraph( src_frag_id, comm_spec_, *client_, dst_graph_name, params)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); return dst_wrapper->graph_def(); } bl::result<rpc::graph::GraphDefPb> GrapeInstance::consolidateColumns( const rpc::GSParams& params) { BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(label, params.Get<std::string>(rpc::CONSOLIDATE_COLUMNS_LABEL)); BOOST_LEAF_AUTO(columns, params.Get<std::string>(rpc::CONSOLIDATE_COLUMNS_COLUMNS)); BOOST_LEAF_AUTO(result_column, params.Get<std::string>( rpc::CONSOLIDATE_COLUMNS_RESULT_COLUMN)); BOOST_LEAF_AUTO( src_wrapper, object_manager_.GetObject<ILabeledFragmentWrapper>(src_graph_name)); if (src_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "ConsolidateColumns is only available for ArrowFragment"); } std::string dst_graph_name = "graph_" + generateId(); VLOG(1) << "Consolidate columns from " << src_graph_name << ", graph name: " << dst_graph_name << ":" << "\nlabel = " << label << "\ncolumns = " << columns << "\nresult_column = " << result_column; BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->ConsolidateColumns( comm_spec_, dst_graph_name, label, columns, result_column)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); return dst_wrapper->graph_def(); } bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::graphToNumpy( const rpc::GSParams& params) { std::pair<std::string, std::string> range; BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO(s_selector, params.Get<std::string>(rpc::SELECTOR)); BOOST_LEAF_AUTO( wrapper, object_manager_.GetObject<ILabeledFragmentWrapper>(graph_name)); if (params.HasKey(rpc::VERTEX_RANGE)) { BOOST_LEAF_AUTO(range_in_json, params.Get<std::string>(rpc::VERTEX_RANGE)); range = parseRange(range_in_json); } BOOST_LEAF_AUTO(selector, LabeledSelector::parse(s_selector)); return wrapper->ToNdArray(comm_spec_, selector, range); } bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::graphToDataframe( const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME)); BOOST_LEAF_AUTO( wrapper, object_manager_.GetObject<ILabeledFragmentWrapper>(graph_name)); std::pair<std::string, std::string> range; if (params.HasKey(rpc::VERTEX_RANGE)) { BOOST_LEAF_AUTO(range_in_json, params.Get<std::string>(rpc::VERTEX_RANGE)); range = parseRange(range_in_json); } BOOST_LEAF_AUTO(s_selector, params.Get<std::string>(rpc::SELECTOR)); BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selector)); return wrapper->ToDataframe(comm_spec_, selectors, range); } bl::result<void> GrapeInstance::registerGraphType(const rpc::GSParams& params) { BOOST_LEAF_AUTO(graph_type, params.Get<rpc::graph::GraphTypePb>(rpc::GRAPH_TYPE)); BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE)); BOOST_LEAF_AUTO(lib_path, params.Get<std::string>(rpc::GRAPH_LIBRARY_PATH)); VLOG(1) << "Registering Graph, graph type: " << rpc::graph::GraphTypePb_Name(graph_type) << ", Type signature: " << type_sig << ", lib path: " << lib_path; if (object_manager_.HasObject(type_sig)) { VLOG(1) << "Graph already registered, signature is: " << type_sig; return {}; } if (graph_type == rpc::graph::ARROW_PROPERTY) { auto utils = std::make_shared<PropertyGraphUtils>(type_sig, lib_path); BOOST_LEAF_CHECK(utils->Init()); return object_manager_.PutObject(utils); } else if (graph_type == rpc::graph::ARROW_PROJECTED || graph_type == rpc::graph::DYNAMIC_PROJECTED || graph_type == rpc::graph::ARROW_FLATTENED) { auto projector = std::make_shared<Projector>(type_sig, lib_path); BOOST_LEAF_CHECK(projector->Init()); return object_manager_.PutObject(projector); } else { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidValueError, "Unsupported graph type: " + rpc::graph::GraphTypePb_Name(graph_type)); } } bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive( std::shared_ptr<CommandDetail> cmd) { auto r = std::make_shared<DispatchResult>(comm_spec_.worker_id()); rpc::GSParams params(cmd->params, cmd->large_attr); switch (cmd->type) { case rpc::CREATE_GRAPH: { BOOST_LEAF_AUTO(graph_def, loadGraph(params)); r->set_graph_def(graph_def); break; } case rpc::CREATE_APP: { // do nothing break; } case rpc::BIND_APP: { BOOST_LEAF_AUTO(app_name, loadApp(params)); r->set_data(app_name); break; } case rpc::RUN_APP: { BOOST_LEAF_AUTO(context_key, query(params, cmd->query_args)); r->set_data(context_key); break; } case rpc::UNLOAD_APP: { BOOST_LEAF_CHECK(unloadApp(params)); break; } case rpc::UNLOAD_GRAPH: { BOOST_LEAF_CHECK(unloadGraph(params)); break; } case rpc::UNLOAD_CONTEXT: { BOOST_LEAF_CHECK(unloadContext(params)); break; } case rpc::REPORT_GRAPH: { BOOST_LEAF_AUTO(arc, reportGraph(params)); r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirstNonEmpty, true); break; } case rpc::ARCHIVE_GRAPH: { BOOST_LEAF_CHECK(archiveGraph(params)); break; } case rpc::PROJECT_GRAPH: { BOOST_LEAF_AUTO(graph_def, projectGraph(params)); r->set_graph_def(graph_def); break; } case rpc::PROJECT_TO_SIMPLE: { BOOST_LEAF_AUTO(graph_def, projectToSimple(params)); r->set_graph_def(graph_def); break; } case rpc::MODIFY_VERTICES: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, modifyVertices(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::MODIFY_EDGES: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, modifyEdges(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::TRANSFORM_GRAPH: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, convertGraph(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::COPY_GRAPH: { BOOST_LEAF_AUTO(graph_def, copyGraph(params)); r->set_graph_def(graph_def); break; } case rpc::TO_DIRECTED: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, toDirected(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::TO_UNDIRECTED: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, toUnDirected(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::INDUCE_SUBGRAPH: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, induceSubGraph(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::CLEAR_GRAPH: { #ifdef NETWORKX BOOST_LEAF_CHECK(clearGraph(params)); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::CLEAR_EDGES: { #ifdef NETWORKX BOOST_LEAF_CHECK(clearEdges(params)); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::VIEW_GRAPH: { #ifdef NETWORKX BOOST_LEAF_AUTO(graph_def, createGraphView(params)); r->set_graph_def(graph_def); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "GraphScope is built with NETWORKX=OFF, please recompile " "it with NETWORKX=ON"); #endif break; } case rpc::ADD_LABELS: { BOOST_LEAF_AUTO(graph_def, addLabelsToGraph(params)); r->set_graph_def(graph_def); break; } case rpc::CONSOLIDATE_COLUMNS: { BOOST_LEAF_AUTO(graph_def, consolidateColumns(params)); r->set_graph_def(graph_def); break; } case rpc::CONTEXT_TO_NUMPY: { BOOST_LEAF_AUTO(arc, contextToNumpy(params)); r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true); break; } case rpc::CONTEXT_TO_DATAFRAME: { BOOST_LEAF_AUTO(arc, contextToDataframe(params)); r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true); break; } case rpc::TO_VINEYARD_TENSOR: { BOOST_LEAF_AUTO(vy_obj_id_in_json, contextToVineyardTensor(params)); r->set_data(vy_obj_id_in_json); break; } case rpc::TO_VINEYARD_DATAFRAME: { BOOST_LEAF_AUTO(vy_obj_id_in_json, contextToVineyardDataFrame(params)); r->set_data(vy_obj_id_in_json); break; } case rpc::OUTPUT: { BOOST_LEAF_CHECK(outputContext(params)); break; } case rpc::GET_CONTEXT_DATA: { BOOST_LEAF_AUTO(context_json, getContextData(params)); r->set_data(context_json, DispatchResult::AggregatePolicy::kPickFirstNonEmpty); break; } case rpc::ADD_COLUMN: { BOOST_LEAF_AUTO(graph_def, addColumn(params)); r->set_graph_def(graph_def); break; } case rpc::GRAPH_TO_NUMPY: { BOOST_LEAF_AUTO(arc, graphToNumpy(params)); r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true); break; } case rpc::GRAPH_TO_DATAFRAME: { BOOST_LEAF_AUTO(arc, graphToDataframe(params)); r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true); break; } case rpc::REGISTER_GRAPH_TYPE: { BOOST_LEAF_CHECK(registerGraphType(params)); break; } case rpc::GET_ENGINE_CONFIG: { EngineConfig conf; #ifdef NETWORKX conf.networkx = "ON"; #else conf.networkx = "OFF"; #endif #ifdef ENABLE_JAVA_SDK conf.enable_java_sdk = "ON"; #else conf.enable_java_sdk = "OFF"; #endif conf.vineyard_socket = client_->IPCSocket(); conf.vineyard_rpc_endpoint = client_->RPCEndpoint(); r->set_data(conf.ToJsonString(), DispatchResult::AggregatePolicy::kPickFirst); break; } default: RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "Unsupported command type: " + std::to_string(cmd->type)); } return r; } } // namespace gs