analytical_engine/core/object/fragment_wrapper.h (1,126 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ANALYTICAL_ENGINE_CORE_OBJECT_FRAGMENT_WRAPPER_H_
#define ANALYTICAL_ENGINE_CORE_OBJECT_FRAGMENT_WRAPPER_H_
#include <mpi.h>
#include <algorithm>
#include <iterator>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#ifdef ENABLE_JAVA_SDK
#include "boost/algorithm/string.hpp"
#include "boost/algorithm/string/split.hpp"
#endif
#include "boost/leaf/error.hpp"
#include "boost/leaf/result.hpp"
#include "grape/fragment/immutable_edgecut_fragment.h"
// #include "grape/fragment/immutable_vertexcut_fragment.h"
#include "grape/serialization/in_archive.h"
#include "grape/worker/comm_spec.h"
#include "vineyard/client/client.h"
#include "vineyard/client/ds/object_meta.h"
#include "vineyard/common/util/json.h"
#include "vineyard/common/util/status.h"
#include "vineyard/common/util/uuid.h"
#include "vineyard/graph/fragment/arrow_fragment.h"
#include "vineyard/graph/fragment/arrow_fragment_group.h"
#include "vineyard/graph/fragment/graph_schema.h"
#include "vineyard/graph/loader/fragment_loader_utils.h"
#include "vineyard/graph/utils/context_protocols.h"
#include "core/context/i_context.h"
#include "core/context/labeled_vertex_property_context.h"
#include "core/context/selector.h"
#include "core/context/vertex_data_context.h"
#include "core/context/vertex_property_context.h"
#include "core/error.h"
#include "core/fragment/arrow_flattened_fragment.h"
#include "core/fragment/arrow_projected_fragment.h"
#include "core/fragment/dynamic_fragment.h"
#include "core/fragment/dynamic_projected_fragment.h"
#include "core/fragment/fragment_reporter.h"
#include "core/object/i_fragment_wrapper.h"
#include "core/server/rpc_utils.h"
#include "core/utils/transform_utils.h"
#include "proto/graph_def.pb.h"
#include "proto/types.pb.h"
namespace bl = boost::leaf;
namespace arrow {
class Array;
}
namespace gs {
gs::rpc::graph::DataTypePb PropertyTypeToPb(vineyard::PropertyType type) {
if (arrow::boolean()->Equals(type)) {
return gs::rpc::graph::DataTypePb::BOOL;
} else if (arrow::int16()->Equals(type)) {
return gs::rpc::graph::DataTypePb::SHORT;
} else if (arrow::int32()->Equals(type)) {
return gs::rpc::graph::DataTypePb::INT;
} else if (arrow::int64()->Equals(type)) {
return gs::rpc::graph::DataTypePb::LONG;
} else if (arrow::uint32()->Equals(type)) {
return gs::rpc::graph::DataTypePb::UINT;
} else if (arrow::uint64()->Equals(type)) {
return gs::rpc::graph::DataTypePb::ULONG;
} else if (arrow::float32()->Equals(type)) {
return gs::rpc::graph::DataTypePb::FLOAT;
} else if (arrow::float64()->Equals(type)) {
return gs::rpc::graph::DataTypePb::DOUBLE;
} else if (arrow::utf8()->Equals(type)) {
return gs::rpc::graph::DataTypePb::STRING;
} else if (arrow::large_utf8()->Equals(type)) {
return gs::rpc::graph::DataTypePb::STRING;
} else if (arrow::date32()->Equals(type)) {
return gs::rpc::graph::DataTypePb::DATE32;
} else if (arrow::date64()->Equals(type)) {
return gs::rpc::graph::DataTypePb::DATE64;
} else if (type->id() == arrow::Type::TIME32) {
auto time32_type = std::dynamic_pointer_cast<arrow::Time32Type>(type);
switch (time32_type->unit()) {
case arrow::TimeUnit::SECOND:
return gs::rpc::graph::DataTypePb::TIME32_S;
case arrow::TimeUnit::MILLI:
return gs::rpc::graph::DataTypePb::TIME32_MS;
case arrow::TimeUnit::MICRO:
return gs::rpc::graph::DataTypePb::TIME32_US;
case arrow::TimeUnit::NANO:
return gs::rpc::graph::DataTypePb::TIME32_NS;
}
} else if (type->id() == arrow::Type::TIME64) {
auto time64_type = std::dynamic_pointer_cast<arrow::Time64Type>(type);
switch (time64_type->unit()) {
case arrow::TimeUnit::SECOND:
return gs::rpc::graph::DataTypePb::TIME64_S;
case arrow::TimeUnit::MILLI:
return gs::rpc::graph::DataTypePb::TIME64_MS;
case arrow::TimeUnit::MICRO:
return gs::rpc::graph::DataTypePb::TIME64_US;
case arrow::TimeUnit::NANO:
return gs::rpc::graph::DataTypePb::TIME64_NS;
}
} else if (type->id() == arrow::Type::TIMESTAMP) {
auto timestamp_type = std::dynamic_pointer_cast<arrow::TimestampType>(type);
switch (timestamp_type->unit()) {
case arrow::TimeUnit::SECOND:
return gs::rpc::graph::DataTypePb::TIMESTAMP_S;
case arrow::TimeUnit::MILLI:
return gs::rpc::graph::DataTypePb::TIMESTAMP_MS;
case arrow::TimeUnit::MICRO:
return gs::rpc::graph::DataTypePb::TIMESTAMP_US;
case arrow::TimeUnit::NANO:
return gs::rpc::graph::DataTypePb::TIMESTAMP_NS;
}
} else if (arrow::large_list(arrow::int32())->Equals(type)) {
return gs::rpc::graph::DataTypePb::INT_LIST;
} else if (arrow::large_list(arrow::int64())->Equals(type)) {
return gs::rpc::graph::DataTypePb::LONG_LIST;
} else if (arrow::large_list(arrow::float32())->Equals(type)) {
return gs::rpc::graph::DataTypePb::FLOAT_LIST;
} else if (arrow::large_list(arrow::float64())->Equals(type)) {
return gs::rpc::graph::DataTypePb::DOUBLE_LIST;
} else if (arrow::large_list(arrow::large_utf8())->Equals(type)) {
return gs::rpc::graph::DataTypePb::STRING_LIST;
} else if (arrow::null()->Equals(type)) {
return gs::rpc::graph::DataTypePb::NULLVALUE;
}
LOG(ERROR) << "Unsupported arrow type " << type->ToString();
return gs::rpc::graph::DataTypePb::UNKNOWN;
}
gs::rpc::graph::DataTypePb PropertyTypeToPb(const std::string& type) {
if (type == "bool") {
return gs::rpc::graph::DataTypePb::BOOL;
} else if (type == "short" || type == "int16" || type == "int16_t") {
return gs::rpc::graph::DataTypePb::SHORT;
} else if (type == "int" || type == "int32" || type == "int32_t") {
return gs::rpc::graph::DataTypePb::INT;
} else if (type == "long" || type == "int64" || type == "int64_t") {
return gs::rpc::graph::DataTypePb::LONG;
} else if (type == "uint" || type == "uint32" || type == "uint32_t") {
return gs::rpc::graph::DataTypePb::UINT;
} else if (type == "ulong" || type == "uint64" || type == "uint64_t") {
return gs::rpc::graph::DataTypePb::ULONG;
} else if (type == "float") {
return gs::rpc::graph::DataTypePb::FLOAT;
} else if (type == "double") {
return gs::rpc::graph::DataTypePb::DOUBLE;
} else if (type == "bytes") {
return gs::rpc::graph::DataTypePb::BYTES;
} else if (type == "string" || type == "std::string" || type == "str") {
return gs::rpc::graph::DataTypePb::STRING;
} else if (type == "int_list") {
return gs::rpc::graph::DataTypePb::INT_LIST;
} else if (type == "long_list") {
return gs::rpc::graph::DataTypePb::LONG_LIST;
} else if (type == "float_list") {
return gs::rpc::graph::DataTypePb::FLOAT_LIST;
} else if (type == "date32[day]") {
return gs::rpc::graph::DataTypePb::DATE32;
} else if (type == "date64[ms]") {
return gs::rpc::graph::DataTypePb::DATE64;
} else if (type == "time32[s]") {
return gs::rpc::graph::DataTypePb::TIME32_S;
} else if (type == "time32[ms]") {
return gs::rpc::graph::DataTypePb::TIME32_MS;
} else if (type == "time32[us]") {
return gs::rpc::graph::DataTypePb::TIME32_US;
} else if (type == "time32[ns]") {
return gs::rpc::graph::DataTypePb::TIME32_NS;
} else if (type == "time64[s]") {
return gs::rpc::graph::DataTypePb::TIME64_S;
} else if (type == "time64[ms]") {
return gs::rpc::graph::DataTypePb::TIME64_MS;
} else if (type == "time64[us]") {
return gs::rpc::graph::DataTypePb::TIME64_US;
} else if (type == "time64[ns]") {
return gs::rpc::graph::DataTypePb::TIME64_NS;
} else if (type.substr(0, std::string("timestamp[s]").length()) ==
"timestamp[s]") {
return gs::rpc::graph::DataTypePb::TIMESTAMP_S;
} else if (type.substr(0, std::string("timestamp[ms]").length()) ==
"timestamp[ms]") {
return gs::rpc::graph::DataTypePb::TIMESTAMP_MS;
} else if (type.substr(0, std::string("timestamp[us]").length()) ==
"timestamp[us]") {
return gs::rpc::graph::DataTypePb::TIMESTAMP_US;
} else if (type.substr(0, std::string("timestamp[ns]").length()) ==
"timestamp[ns]") {
return gs::rpc::graph::DataTypePb::TIMESTAMP_NS;
} else if (type == "double_list") {
return gs::rpc::graph::DataTypePb::DOUBLE_LIST;
} else if (type == "string_list" || type == "str_list") {
return gs::rpc::graph::DataTypePb::STRING_LIST;
} else if (type == "grape::EmptyType" || type == "null") {
return gs::rpc::graph::DataTypePb::NULLVALUE;
} else if (type == "dynamic::Value") {
return gs::rpc::graph::DataTypePb::DYNAMIC;
}
LOG(ERROR) << "Unsupported type " << type;
return gs::rpc::graph::DataTypePb::UNKNOWN;
}
gs::rpc::graph::TypeEnumPb TypeToTypeEnum(const std::string& type) {
if (type == "VERTEX") {
return gs::rpc::graph::TypeEnumPb::VERTEX;
} else /* if (type == "EDGE") */ {
return gs::rpc::graph::TypeEnumPb::EDGE;
}
}
void ToPropertyDef(const vineyard::Entry::PropertyDef& prop,
const std::vector<std::string>& primary_keys,
gs::rpc::graph::PropertyDefPb* prop_def) {
prop_def->set_id(prop.id);
prop_def->set_name(prop.name);
prop_def->set_data_type(PropertyTypeToPb(prop.type));
if (std::find(std::begin(primary_keys), std::end(primary_keys), prop.name) !=
std::end(primary_keys)) {
prop_def->set_pk(true);
}
}
void ToTypeDef(const vineyard::Entry& entry,
gs::rpc::graph::TypeDefPb* type_def) {
type_def->set_label(entry.label);
type_def->mutable_label_id()->set_id(entry.id);
type_def->set_type_enum(TypeToTypeEnum(entry.type));
auto properties = entry.properties();
auto primary_keys = entry.primary_keys;
for (const auto& prop : properties) {
ToPropertyDef(prop, primary_keys, type_def->add_props());
}
}
void ToEdgeKind(const std::string& label,
const std::pair<std::string, std::string>& relation,
gs::rpc::graph::EdgeKindPb* edge_kind) {
edge_kind->set_edge_label(label);
edge_kind->set_src_vertex_label(relation.first);
edge_kind->set_dst_vertex_label(relation.second);
}
inline void set_graph_def(
const std::shared_ptr<vineyard::ArrowFragmentBase>& fragment,
rpc::graph::GraphDefPb& graph_def) {
auto& meta = fragment->meta();
const auto& schema = fragment->schema();
graph_def.set_graph_type(rpc::graph::ARROW_PROPERTY);
graph_def.set_directed(fragment->directed());
graph_def.set_is_multigraph(fragment->is_multigraph());
graph_def.set_compact_edges(fragment->compact_edges());
graph_def.set_use_perfect_hash(fragment->use_perfect_hash());
auto v_entries = schema.vertex_entries();
auto e_entries = schema.edge_entries();
for (const auto& entry : v_entries) {
ToTypeDef(entry, graph_def.add_type_defs());
}
for (const auto& entry : e_entries) {
ToTypeDef(entry, graph_def.add_type_defs());
}
for (const auto& entry : e_entries) {
for (const auto& rel : entry.relations) {
ToEdgeKind(entry.label, rel, graph_def.add_edge_kinds());
}
}
auto property_name_to_id = graph_def.mutable_property_name_to_id();
for (const auto& pair : schema.GetPropertyNameToIDMapping()) {
(*property_name_to_id)[pair.first] = pair.second;
}
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
}
vy_info.set_oid_type(PropertyTypeToPb(fragment->oid_typename()));
vy_info.set_vid_type(PropertyTypeToPb(fragment->vid_typename()));
vineyard::json schema_json;
meta.GetKeyValue("schema_json_", schema_json);
vy_info.set_property_schema_json(schema_json.dump());
graph_def.mutable_extension()->PackFrom(vy_info);
}
/**
* @brief This is a fragment wrapper, which means a series of methods are
* provided to serialize/transform the data attached to the fragment. An
* AddColumn method is provided to add properties to create a new fragment from
* the original one.
* @tparam FRAG_T
*/
template <typename FRAG_T>
class FragmentWrapper {};
/**
* @brief A specialized FragmentWrapper for ArrowFragment.
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <typename OID_T, typename VID_T, typename VERTEX_MAP_T, bool COMPACT>
class FragmentWrapper<
vineyard::ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>>
: public ILabeledFragmentWrapper {
using fragment_t =
vineyard::ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>;
using label_id_t = typename fragment_t::label_id_t;
using prop_id_t = typename fragment_t::prop_id_t;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: ILabeledFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::ARROW_PROPERTY);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, fragment_->id(), comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto dst_graph_def = graph_def_;
dst_graph_def.set_key(dst_graph_name);
gs::rpc::graph::VineyardInfoPb vy_info;
if (dst_graph_def.has_extension()) {
dst_graph_def.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
dst_graph_def.mutable_extension()->PackFrom(vy_info);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, dst_graph_def, fragment_);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
#ifdef NETWORKX
BOOST_LEAF_AUTO(default_label_id,
params.Get<int64_t>(rpc::DEFAULT_LABEL_ID));
ArrowFragmentReporter<fragment_t> reporter(comm_spec, default_label_id);
return reporter.Report(fragment_, params);
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GraphScope is built with NETWORKX=OFF, please recompile "
"it with NETWORKX=ON");
#endif
}
bl::result<std::shared_ptr<ILabeledFragmentWrapper>> Project(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::map<int, std::vector<int>>& vertices,
const std::map<int, std::vector<int>>& edges) override {
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
BOOST_LEAF_AUTO(new_frag_id, fragment_->Project(*client, vertices, edges));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);
rpc::graph::GraphDefPb new_graph_def;
new_graph_def.set_key(dst_graph_name);
new_graph_def.set_compact_edges(new_frag->compact_edges());
new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def_.has_extension()) {
graph_def_.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
new_graph_def.mutable_extension()->PackFrom(vy_info);
set_graph_def(new_frag, new_graph_def);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}
bl::result<std::shared_ptr<ILabeledFragmentWrapper>> ConsolidateColumns(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& label, const std::string& columns,
const std::string& result_column) override {
auto& schema = fragment_->schema();
label_id_t vertex_label_id = schema.GetVertexLabelId(label);
label_id_t edge_label_id = schema.GetEdgeLabelId(label);
std::vector<std::string> column_names;
boost::split(column_names, columns, boost::is_any_of(",;"));
if (vertex_label_id == -1 && edge_label_id == -1) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid vertex or edge label: " + label);
}
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
vineyard::ObjectID new_frag_id = vineyard::InvalidObjectID();
if (vertex_label_id != -1) {
BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateVertexColumns(
*client, vertex_label_id, column_names,
result_column));
} else if (edge_label_id != -1) {
BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateEdgeColumns(
*client, edge_label_id, column_names,
result_column));
}
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);
rpc::graph::GraphDefPb new_graph_def;
new_graph_def.set_key(dst_graph_name);
new_graph_def.set_compact_edges(new_frag->compact_edges());
new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def_.has_extension()) {
graph_def_.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
new_graph_def.mutable_extension()->PackFrom(vy_info);
set_graph_def(new_frag, new_graph_def);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}
bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
const std::string& s_selectors) override {
const auto& context_type = ctx_wrapper->context_type();
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
if (context_type != CONTEXT_TYPE_VERTEX_DATA &&
context_type != CONTEXT_TYPE_LABELED_VERTEX_DATA &&
context_type != CONTEXT_TYPE_VERTEX_PROPERTY &&
context_type != CONTEXT_TYPE_LABELED_VERTEX_PROPERTY
#ifdef ENABLE_JAVA_SDK
&& (context_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) ==
std::string::npos) &&
(context_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) ==
std::string::npos)
#endif
) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Illegal context type: " + context_type);
}
auto frag_wrapper = ctx_wrapper->fragment_wrapper();
auto graph_type = frag_wrapper->graph_def().graph_type();
vineyard::ObjectID vm_id_from_ctx = 0;
if (graph_type == rpc::graph::ARROW_PROPERTY) {
vm_id_from_ctx =
std::static_pointer_cast<const vineyard::ArrowFragmentBase>(
frag_wrapper->fragment())
->vertex_map_id();
} else if (graph_type == rpc::graph::ARROW_PROJECTED) {
auto& proj_meta =
std::static_pointer_cast<const ArrowProjectedFragmentBase>(
frag_wrapper->fragment())
->meta();
const auto& frag_meta = proj_meta.GetMemberMeta("arrow_fragment");
vm_id_from_ctx =
client->GetObject<vineyard::ArrowFragmentBase>(frag_meta.GetId())
->vertex_map_id();
}
std::map<label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
columns;
if (context_type == CONTEXT_TYPE_VERTEX_DATA) {
auto vd_ctx_wrapper =
std::dynamic_pointer_cast<IVertexDataContextWrapper>(ctx_wrapper);
auto& proj_meta =
std::static_pointer_cast<const ArrowProjectedFragmentBase>(
frag_wrapper->fragment())
->meta();
auto v_label_id = proj_meta.GetKeyValue<label_id_t>("projected_v_label");
BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selectors));
BOOST_LEAF_AUTO(arrow_arrays,
vd_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
columns[v_label_id] = arrow_arrays;
} else if (context_type == CONTEXT_TYPE_LABELED_VERTEX_DATA) {
auto lvd_ctx_wrapper =
std::dynamic_pointer_cast<ILabeledVertexDataContextWrapper>(
ctx_wrapper);
BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selectors));
BOOST_LEAF_ASSIGN(columns,
lvd_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
} else if (context_type == CONTEXT_TYPE_VERTEX_PROPERTY) {
auto vp_ctx_wrapper =
std::dynamic_pointer_cast<IVertexPropertyContextWrapper>(ctx_wrapper);
auto& proj_meta =
std::static_pointer_cast<const ArrowProjectedFragmentBase>(
frag_wrapper->fragment())
->meta();
auto v_label_id = proj_meta.GetKeyValue<label_id_t>("projected_v_label");
BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selectors));
BOOST_LEAF_AUTO(arrow_arrays,
vp_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
columns[v_label_id] = arrow_arrays;
} else if (context_type == CONTEXT_TYPE_LABELED_VERTEX_PROPERTY) {
auto vp_ctx_wrapper =
std::dynamic_pointer_cast<ILabeledVertexPropertyContextWrapper>(
ctx_wrapper);
BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selectors));
BOOST_LEAF_ASSIGN(columns,
vp_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
#ifdef ENABLE_JAVA_SDK
} else if (context_type.find(CONTEXT_TYPE_JAVA_PIE_PROPERTY) !=
std::string::npos) {
std::vector<std::string> outer_and_inner;
boost::split(outer_and_inner, context_type, boost::is_any_of(":"));
if (outer_and_inner.size() != 2) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Unsupported java property context type: " +
std::string(context_type));
}
auto vp_ctx_wrapper =
std::dynamic_pointer_cast<IJavaPIEPropertyContextWrapper>(
ctx_wrapper);
BOOST_LEAF_AUTO(selectors, LabeledSelector::ParseSelectors(s_selectors));
BOOST_LEAF_ASSIGN(columns,
vp_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
} else if (context_type.find(CONTEXT_TYPE_JAVA_PIE_PROJECTED) !=
std::string::npos) {
std::vector<std::string> outer_and_inner;
boost::split(outer_and_inner, context_type, boost::is_any_of(":"));
if (outer_and_inner.size() != 2) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Unsupported java projected context type: " +
std::string(context_type));
}
auto vp_ctx_wrapper =
std::dynamic_pointer_cast<IJavaPIEProjectedContextWrapper>(
ctx_wrapper);
auto& proj_meta =
std::static_pointer_cast<const ArrowProjectedFragmentBase>(
frag_wrapper->fragment())
->meta();
auto v_label_id = proj_meta.GetKeyValue<label_id_t>("projected_v_label");
BOOST_LEAF_AUTO(selectors, Selector::ParseSelectors(s_selectors));
BOOST_LEAF_AUTO(arrow_arrays,
vp_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
columns[v_label_id] = arrow_arrays;
#endif
}
vineyard::ObjectMeta ctx_meta, cur_meta;
VINEYARD_CHECK_OK(client->GetMetaData(vm_id_from_ctx, ctx_meta));
VINEYARD_CHECK_OK(
client->GetMetaData(fragment_->vertex_map_id(), cur_meta));
auto ctx_fnum = ctx_meta.GetKeyValue<fid_t>("fnum");
auto cur_fnum = cur_meta.GetKeyValue<fid_t>("fnum");
if (ctx_fnum != cur_fnum) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kIllegalStateError,
"Fragment number of context differ from the destination fragment");
}
for (const auto& pair : columns) {
if (fragment_->schema().GetVertexLabelName(pair.first).empty()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Label id " + std::to_string(pair.first) +
" is invalid in the destination fragment");
}
for (fid_t i = 0; i < cur_fnum; ++i) {
auto name =
"o2g_" + std::to_string(i) + "_" + std::to_string(pair.first);
if (ctx_meta.HasKey(name) && cur_meta.HasKey(name)) {
auto id_in_ctx = ctx_meta.GetMemberMeta(name).GetId();
auto id_in_cur = cur_meta.GetMemberMeta(name).GetId();
if (id_in_ctx != id_in_cur) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"OID to GID mapping '" + name +
"' in context differ from vertex map of the "
"destination fragment");
}
}
name = "oid_arrays_" + std::to_string(i) + "_" +
std::to_string(pair.first);
auto id_in_ctx = ctx_meta.GetMemberMeta(name).GetId();
auto id_in_cur = cur_meta.GetMemberMeta(name).GetId();
if (id_in_ctx != id_in_cur) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"OID array '" + name +
"' in context differs from vertex map of the "
"destination fragment");
}
}
}
BOOST_LEAF_AUTO(new_frag_id, fragment_->AddVertexColumns(*client, columns));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);
rpc::graph::GraphDefPb new_graph_def;
new_graph_def.set_key(dst_graph_name);
new_graph_def.set_compact_edges(new_frag->compact_edges());
new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def_.has_extension()) {
graph_def_.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
new_graph_def.mutable_extension()->PackFrom(vy_info);
set_graph_def(new_frag, new_graph_def);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, const LabeledSelector& selector,
const std::pair<std::string, std::string>& range) override {
TransformUtils<fragment_t> trans_utils(comm_spec, *fragment_);
auto label_id = selector.label_id();
auto vertices = trans_utils.SelectVertices(label_id, range);
auto arc = std::make_unique<grape::InArchive>();
auto local_num = static_cast<int64_t>(vertices.size());
int64_t total_num;
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(1);
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(oid_type, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(oid_type);
*arc << total_num;
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexData: {
auto prop_id = selector.property_id();
auto vertex_prop_num = fragment_->vertex_property_num(label_id);
if (prop_id >= vertex_prop_num) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"property id out of range: " + std::to_string(prop_id));
}
if (comm_spec.fid() == 0) {
*arc << vineyard::ArrowDataTypeToInt(
fragment_->vertex_property_type(label_id, prop_id));
*arc << total_num;
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(trans_utils.SerializeVertexProperty(vertices, label_id,
prop_id, *arc));
break;
}
default:
RETURN_GS_ERROR(vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: "
"vid,vdata selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
return std::move(arc);
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, LabeledSelector>>& selectors,
const std::pair<std::string, std::string>& range) override {
TransformUtils<fragment_t> trans_utils(comm_spec, *fragment_);
BOOST_LEAF_AUTO(label_id, LabeledSelector::GetVertexLabelId(selectors));
auto vertices = trans_utils.SelectVertices(label_id, range);
auto arc = std::make_unique<grape::InArchive>();
auto local_num = static_cast<int64_t>(vertices.size());
if (comm_spec.fid() == 0) {
int64_t total_num;
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(selectors.size());
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
if (comm_spec.fid() == 0) {
*arc << col_name;
}
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<OID_T>::value);
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexData: {
if (comm_spec.fid() == 0) {
*arc << vineyard::ArrowDataTypeToInt(fragment_->vertex_property_type(
label_id, selector.property_id()));
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(trans_utils.SerializeVertexProperty(
vertices, label_id, selector.property_id(), *arc));
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
}
return std::move(arc);
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
int thread_num =
(std::thread::hardware_concurrency() + comm_spec.local_num() - 1) /
comm_spec.local_num();
BOOST_LEAF_AUTO(new_frag_id,
fragment_->TransformDirection(*client, thread_num));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);
rpc::graph::GraphDefPb new_graph_def;
new_graph_def.set_key(dst_graph_name);
new_graph_def.set_compact_edges(new_frag->compact_edges());
new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def_.has_extension()) {
graph_def_.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
new_graph_def.mutable_extension()->PackFrom(vy_info);
set_graph_def(new_frag, new_graph_def);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
return ToDirected(comm_spec, dst_graph_name);
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a graph view over the ArrowFragment.");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
/**
* @brief A specialized FragmentWrapper for ArrowProjectedFragment.
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T,
typename VERTEX_MAP_T, bool COMPACT>
class FragmentWrapper<ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T,
VERTEX_MAP_T, COMPACT>>
: public IFragmentWrapper {
using fragment_t = ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T,
VERTEX_MAP_T, COMPACT>;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::ARROW_PROJECTED);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowProjectedFragment");
}
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a view over the ArrowProjectedFragment");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
/**
* @brief A specialized FragmentWrapper for ArrowFlattenedFragment.
*/
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T,
typename VERTEX_MAP_T>
class FragmentWrapper<
ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T, VERTEX_MAP_T>>
: public IFragmentWrapper {
using fragment_t =
ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T, VERTEX_MAP_T>;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::ARROW_FLATTENED);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowFlattenedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed ArrowFlattenedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected ArrowFlattenedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a graph view over the ArrowFlattenedFragment.");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
/**
* @brief A specialized FragmentWrapper for ImmutableEdgecutFragment.
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class FragmentWrapper<
grape::ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t =
grape::ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T>;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::IMMUTABLE_EDGECUT);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowProjectedFragment");
}
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a view over the ArrowProjectedFragment");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
/*
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class FragmentWrapper<
grape::ImmutableVertexcutFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t =
grape::ImmutableVertexcutFragment<OID_T, VID_T, VDATA_T, EDATA_T>;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::IMMUTABLE_EDGECUT);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowProjectedFragment");
}
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a view over the ArrowProjectedFragment");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
*/
#ifdef NETWORKX
/**
* @brief A specialized FragmentWrapper for DynamicFragment.
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <>
class FragmentWrapper<DynamicFragment> : public IFragmentWrapper {
using fragment_t = DynamicFragment;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::DYNAMIC_PROPERTY);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
DynamicFragmentReporter reporter(comm_spec);
return reporter.Report(fragment_, params);
}
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
// copy vertex map
auto ori_vm_ptr = fragment_->GetVertexMap();
auto new_vm_ptr =
std::make_shared<typename fragment_t::vertex_map_t>(comm_spec);
new_vm_ptr->SetPartitioner(ori_vm_ptr->GetPartitioner());
new_vm_ptr->Init();
std::vector<std::thread> copy_vm_threads(comm_spec.fnum());
for (size_t fid = 0; fid < comm_spec.fnum(); ++fid) {
copy_vm_threads[fid] = std::thread(
[&](size_t fid) {
typename fragment_t::oid_t oid;
typename fragment_t::vid_t gid{};
typename fragment_t::vid_t fvnum =
ori_vm_ptr->GetInnerVertexSize(fid);
for (typename fragment_t::vid_t lid = 0; lid < fvnum; lid++) {
ori_vm_ptr->GetOid(fid, lid, oid);
CHECK(new_vm_ptr->AddVertex(std::move(oid), gid));
}
},
fid);
}
for (auto& thrd : copy_vm_threads) {
thrd.join();
}
// copy fragment
auto dst_frag = std::make_shared<fragment_t>(new_vm_ptr);
dst_frag->CopyFrom(fragment_, copy_type);
auto dst_graph_def = graph_def_;
dst_graph_def.set_key(dst_graph_name);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, dst_graph_def, dst_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
// copy vertex map
auto ori_vm_ptr = fragment_->GetVertexMap();
auto new_vm_ptr =
std::make_shared<typename fragment_t::vertex_map_t>(comm_spec);
new_vm_ptr->SetPartitioner(ori_vm_ptr->GetPartitioner());
new_vm_ptr->Init();
std::vector<std::thread> copy_vm_threads(comm_spec.fnum());
for (size_t fid = 0; fid < comm_spec.fnum(); ++fid) {
copy_vm_threads[fid] = std::thread(
[&](size_t fid) {
typename fragment_t::oid_t oid;
typename fragment_t::vid_t gid{};
typename fragment_t::vid_t fvnum =
ori_vm_ptr->GetInnerVertexSize(fid);
for (typename fragment_t::vid_t lid = 0; lid < fvnum; lid++) {
ori_vm_ptr->GetOid(fid, lid, oid);
CHECK(new_vm_ptr->AddVertex(std::move(oid), gid));
}
},
fid);
}
for (auto& thrd : copy_vm_threads) {
thrd.join();
}
// copy fragment
auto dst_frag = std::make_shared<fragment_t>(new_vm_ptr);
dst_frag->ToDirectedFrom(fragment_);
auto dst_graph_def = graph_def_;
dst_graph_def.set_key(dst_graph_name);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, dst_graph_def, dst_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
// copy vertex map
auto ori_vm_ptr = fragment_->GetVertexMap();
auto new_vm_ptr =
std::make_shared<typename fragment_t::vertex_map_t>(comm_spec);
new_vm_ptr->SetPartitioner(ori_vm_ptr->GetPartitioner());
new_vm_ptr->Init();
std::vector<std::thread> copy_vm_threads(comm_spec.fnum());
for (size_t fid = 0; fid < comm_spec.fnum(); ++fid) {
copy_vm_threads[fid] = std::thread(
[&](size_t fid) {
typename fragment_t::oid_t oid;
typename fragment_t::vid_t gid{};
typename fragment_t::vid_t fvnum =
ori_vm_ptr->GetInnerVertexSize(fid);
for (typename fragment_t::vid_t lid = 0; lid < fvnum; lid++) {
ori_vm_ptr->GetOid(fid, lid, oid);
CHECK(new_vm_ptr->AddVertex(std::move(oid), gid));
}
},
fid);
}
for (auto& thrd : copy_vm_threads) {
thrd.join();
}
// copy fragment
auto dst_frag = std::make_shared<fragment_t>(new_vm_ptr);
dst_frag->ToUndirectedFrom(fragment_);
auto dst_graph_def = graph_def_;
dst_graph_def.set_key(dst_graph_name);
auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, dst_graph_def, dst_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& view_graph_id,
const std::string& view_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a view over the DynamicFragment");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
/**
* @brief A specialized FragmentWrapper for DynamicProjectedFragment.
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <typename VDATA_T, typename EDATA_T>
class FragmentWrapper<DynamicProjectedFragment<VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t = DynamicProjectedFragment<VDATA_T, EDATA_T>;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::DYNAMIC_PROJECTED);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a graph view over the ArrowFragment.");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
#endif
} // namespace gs
#endif // ANALYTICAL_ENGINE_CORE_OBJECT_FRAGMENT_WRAPPER_H_