analytical_engine/core/fragment/dynamic_fragment.h (1,456 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_CORE_FRAGMENT_DYNAMIC_FRAGMENT_H_ #define ANALYTICAL_ENGINE_CORE_FRAGMENT_DYNAMIC_FRAGMENT_H_ #ifdef NETWORKX #include <glog/logging.h> #include <algorithm> #include <cassert> #include <limits> #include <map> #include <memory> #include <ostream> #include <set> #include <string> #include <utility> #include <vector> #include "grape/fragment/basic_fragment_mutator.h" #include "grape/fragment/csr_edgecut_fragment_base.h" #include "grape/graph/de_mutable_csr.h" #include "grape/utils/bitset.h" #include "grape/utils/vertex_set.h" #include "vineyard/graph/fragment/property_graph_types.h" #include "core/config.h" #include "core/object/dynamic.h" #include "core/utils/convert_utils.h" #include "core/utils/partitioner.h" #include "proto/types.pb.h" namespace gs { struct DynamicFragmentTraits { using oid_t = dynamic::Value; using vid_t = vineyard::property_graph_types::VID_TYPE; using vdata_t = dynamic::Value; using edata_t = dynamic::Value; using nbr_t = grape::Nbr<vid_t, edata_t>; using vertex_map_t = grape::GlobalVertexMap<oid_t, vid_t>; using inner_vertices_t = grape::VertexRange<vid_t>; using outer_vertices_t = grape::VertexRange<vid_t>; using vertices_t = grape::DualVertexRange<vid_t>; using sub_vertices_t = grape::VertexVector<vid_t>; using fragment_adj_list_t = grape::FilterAdjList<vid_t, edata_t, std::function<bool(const nbr_t&)>>; using fragment_const_adj_list_t = grape::FilterConstAdjList<vid_t, edata_t, std::function<bool(const nbr_t&)>>; using csr_t = grape::DeMutableCSR<vid_t, nbr_t>; using csr_builder_t = grape::DeMutableCSRBuilder<vid_t, nbr_t>; using mirror_vertices_t = std::vector<grape::Vertex<vid_t>>; }; class DynamicFragment : public grape::CSREdgecutFragmentBase< dynamic::Value, vineyard::property_graph_types::VID_TYPE, dynamic::Value, dynamic::Value, DynamicFragmentTraits> { public: using oid_t = dynamic::Value; using vid_t = vineyard::property_graph_types::VID_TYPE; using vdata_t = dynamic::Value; using edata_t = dynamic::Value; using traits_t = DynamicFragmentTraits; using base_t = grape::CSREdgecutFragmentBase<oid_t, vid_t, vdata_t, edata_t, traits_t>; using internal_vertex_t = grape::internal::Vertex<vid_t, vdata_t>; using edge_t = grape::Edge<vid_t, edata_t>; using nbr_t = grape::Nbr<vid_t, edata_t>; using vertex_t = grape::Vertex<vid_t>; static constexpr grape::LoadStrategy load_strategy = grape::LoadStrategy::kOnlyOut; static constexpr double dense_threshold = 0.003; using vertex_map_t = typename traits_t::vertex_map_t; using partitioner_t = typename vertex_map_t::partitioner_t; using mutation_t = grape::Mutation<vid_t, vdata_t, edata_t>; using IsEdgeCut = std::true_type; using IsVertexCut = std::false_type; using inner_vertices_t = typename traits_t::inner_vertices_t; using outer_vertices_t = typename traits_t::outer_vertices_t; using vertices_t = typename traits_t::vertices_t; using fragment_adj_list_t = typename traits_t::fragment_adj_list_t; using fragment_const_adj_list_t = typename traits_t::fragment_const_adj_list_t; template <typename T> using inner_vertex_array_t = grape::VertexArray<inner_vertices_t, T>; template <typename T> using outer_vertex_array_t = grape::VertexArray<outer_vertices_t, T>; template <typename T> using vertex_array_t = grape::VertexArray<vertices_t, T>; using vertex_range_t = inner_vertices_t; explicit DynamicFragment(std::shared_ptr<vertex_map_t> vm_ptr) : grape::FragmentBase<oid_t, vid_t, vdata_t, edata_t, traits_t>(vm_ptr) {} virtual ~DynamicFragment() = default; using base_t::buildCSR; using base_t::init; using base_t::IsInnerVertexGid; void Init(fid_t fid, bool directed, std::vector<internal_vertex_t>& vertices, std::vector<edge_t>& edges) override { init(fid, directed); load_strategy_ = directed ? grape::LoadStrategy::kBothOutIn : grape::LoadStrategy::kOnlyOut; ovnum_ = 0; static constexpr vid_t invalid_vid = std::numeric_limits<vid_t>::max(); if (load_strategy_ == grape::LoadStrategy::kOnlyIn) { for (auto& e : edges) { if (IsInnerVertexGid(e.dst)) { if (!IsInnerVertexGid(e.src)) { parseOrAddOuterVertexGid(e.src); } } else { e.src = invalid_vid; } } } else if (load_strategy_ == grape::LoadStrategy::kOnlyOut) { for (auto& e : edges) { if (IsInnerVertexGid(e.src)) { if (!IsInnerVertexGid(e.dst)) { parseOrAddOuterVertexGid(e.dst); } } else { e.src = invalid_vid; } } } else if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { for (auto& e : edges) { if (IsInnerVertexGid(e.src)) { if (!IsInnerVertexGid(e.dst)) { parseOrAddOuterVertexGid(e.dst); } } else { if (IsInnerVertexGid(e.dst)) { parseOrAddOuterVertexGid(e.src); } else { e.src = invalid_vid; } } } } initVertexMembersOfFragment(); initOuterVerticesOfFragment(); buildCSR(this->Vertices(), edges, load_strategy_); ivdata_.clear(); ivdata_.resize(ivnum_, dynamic::Value(rapidjson::kObjectType)); if (sizeof(internal_vertex_t) > sizeof(vid_t)) { for (auto& v : vertices) { vid_t gid = v.vid; if (id_parser_.get_fragment_id(gid) == fid_) { ivdata_[id_parser_.get_local_id(gid)] = std::move(v.vdata); } } } initSchema(); } // Init an empty fragment. void Init(fid_t fid, bool directed) { std::vector<internal_vertex_t> empty_vertices; std::vector<edge_t> empty_edges; Init(fid, directed, empty_vertices, empty_edges); } // Init fragment from arrow property fragment. void Init(fid_t fid, bool directed, std::vector<std::vector<internal_vertex_t>>& vertices, std::vector<std::vector<edge_t>>& edges, std::vector<int>& inner_oe_degree, std::vector<int>& outer_oe_degree, std::vector<int>& inner_ie_degree, std::vector<int>& outer_ie_degree, uint32_t thread_num) { init(fid, directed); load_strategy_ = directed ? grape::LoadStrategy::kBothOutIn : grape::LoadStrategy::kOnlyOut; ovnum_ = 0; if (load_strategy_ == grape::LoadStrategy::kOnlyOut) { for (auto& vec : edges) { for (auto& e : vec) { if (!IsInnerVertexGid(e.dst)) { parseOrAddOuterVertexGid(e.dst); } } } } else if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { for (auto& vec : edges) { for (auto& e : vec) { if (IsInnerVertexGid(e.src)) { if (!IsInnerVertexGid(e.dst)) { parseOrAddOuterVertexGid(e.dst); } } else { parseOrAddOuterVertexGid(e.src); } } } } initVertexMembersOfFragment(); initOuterVerticesOfFragment(); buildCSRParallel(edges, inner_oe_degree, outer_oe_degree, inner_ie_degree, outer_ie_degree, thread_num); ivdata_.clear(); ivdata_.resize(ivnum_); // process vertices data parallel if (sizeof(internal_vertex_t) > sizeof(vid_t)) { parallel_for( vertices.begin(), vertices.end(), [&](uint32_t tid, std::vector<internal_vertex_t>& vs) { for (auto& v : vs) { ivdata_[v.vid] = std::move(v.vdata); } }, thread_num, 1); } initSchema(); } using base_t::Gid2Lid; using base_t::ie_; using base_t::oe_; using base_t::vm_ptr_; void Mutate(mutation_t& mutation) { vertex_t v; if (!mutation.vertices_to_remove.empty() && static_cast<double>(mutation.vertices_to_remove.size()) / static_cast<double>(this->GetVerticesNum()) < 0.1) { std::set<vertex_t> sparse_set; for (auto gid : mutation.vertices_to_remove) { if (Gid2Vertex(gid, v) && IsAliveVertex(v)) { if (IsInnerVertex(v)) { if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { ie_.remove_vertex(v.GetValue()); } oe_.remove_vertex(v.GetValue()); iv_alive_.reset_bit(v.GetValue()); --alive_ivnum_; is_selfloops_.reset_bit(v.GetValue()); } else { ov_alive_.reset_bit(outerVertexLidToIndex(v.GetValue())); } sparse_set.insert(v); } } if (!sparse_set.empty()) { auto func = [&sparse_set](vid_t i, const nbr_t& e) { return sparse_set.find(e.neighbor) != sparse_set.end(); }; if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { ie_.remove_if(func); } oe_.remove_if(func); } } else if (!mutation.vertices_to_remove.empty()) { grape::DenseVertexSet<vertices_t> dense_bitset(Vertices()); for (auto gid : mutation.vertices_to_remove) { if (Gid2Vertex(gid, v) && IsAliveVertex(v)) { if (IsInnerVertex(v)) { if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { ie_.remove_vertex(v.GetValue()); } oe_.remove_vertex(v.GetValue()); iv_alive_.reset_bit(v.GetValue()); --alive_ivnum_; is_selfloops_.reset_bit(v.GetValue()); } else { ov_alive_.reset_bit(outerVertexLidToIndex(v.GetValue())); } dense_bitset.Insert(v); } } auto func = [&dense_bitset](vid_t i, const nbr_t& e) { return dense_bitset.Exist(e.neighbor); }; if (!dense_bitset.Empty()) { if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { ie_.remove_if(func); } oe_.remove_if(func); } } if (!mutation.edges_to_remove.empty()) { removeEdges(mutation.edges_to_remove); } if (!mutation.edges_to_update.empty()) { for (auto& e : mutation.edges_to_update) { if (IsInnerVertexGid(e.src)) { e.src = id_parser_.get_local_id(e.src); } else { e.src = parseOuterVertexGid(e.src); } if (IsInnerVertexGid(e.dst)) { e.dst = id_parser_.get_local_id(e.dst); } else { e.dst = parseOuterVertexGid(e.dst); } } updateEdges(mutation.edges_to_update); } { auto& edges_to_add = mutation.edges_to_add; static constexpr vid_t invalid_vid = std::numeric_limits<vid_t>::max(); vid_t old_ovnum = ovgid_.size(); // parseOrAddOuterVertexGid will update ovnum_ for (auto& e : edges_to_add) { if (IsInnerVertexGid(e.src)) { e.src = id_parser_.get_local_id(e.src); if (IsInnerVertexGid(e.dst)) { e.dst = id_parser_.get_local_id(e.dst); } else { mutation.vertices_to_add.emplace_back(e.dst); e.dst = parseOrAddOuterVertexGid(e.dst); } } else { if (IsInnerVertexGid(e.dst)) { mutation.vertices_to_add.emplace_back(e.src); e.src = parseOrAddOuterVertexGid(e.src); e.dst = id_parser_.get_local_id(e.dst); } else { e.src = invalid_vid; } } } vid_t new_ivnum = vm_ptr_->GetInnerVertexSize(fid_); vid_t new_ovnum = ovgid_.size(); assert(new_ovnum == ovnum_); assert(new_ivnum >= ivnum_ && new_ovnum >= old_ovnum); is_selfloops_.resize(new_ivnum); oe_.add_vertices(new_ivnum - ivnum_, new_ovnum - old_ovnum); ie_.add_vertices(new_ivnum - ivnum_, new_ovnum - old_ovnum); this->ivnum_ = new_ivnum; if (old_ovnum != new_ovnum) { initOuterVerticesOfFragment(); } if (!edges_to_add.empty()) { addEdges(edges_to_add); } this->inner_vertices_.SetRange(0, new_ivnum); this->outer_vertices_.SetRange(id_parser_.max_local_id() - new_ovnum, id_parser_.max_local_id()); this->vertices_.SetRange(0, new_ivnum, id_parser_.max_local_id() - new_ovnum, id_parser_.max_local_id()); } ivdata_.resize(this->ivnum_, dynamic::Value(rapidjson::kObjectType)); iv_alive_.resize(this->ivnum_); ov_alive_.resize(this->ovnum_); alive_ovnum_ = this->ovnum_; for (auto& v : mutation.vertices_to_add) { vid_t lid; if (IsInnerVertexGid(v.vid)) { this->InnerVertexGid2Lid(v.vid, lid); ivdata_[lid].Update(v.vdata); if (iv_alive_.get_bit(lid) == false) { iv_alive_.set_bit(lid); ++alive_ivnum_; } } else { if (this->OuterVertexGid2Lid(v.vid, lid)) { auto index = outerVertexLidToIndex(lid); if (ov_alive_.get_bit(index) == false) { ov_alive_.set_bit(index); } } } } for (auto& v : mutation.vertices_to_update) { vid_t lid; if (IsInnerVertexGid(v.vid)) { this->InnerVertexGid2Lid(v.vid, lid); ivdata_[lid] = std::move(v.vdata); } } } void PrepareToRunApp(const grape::CommSpec& comm_spec, grape::PrepareConf conf) override { base_t::PrepareToRunApp(comm_spec, conf); if (conf.need_split_edges_by_fragment) { LOG(ERROR) << "MutableEdgecutFragment cannot split edges by fragment"; } else if (conf.need_split_edges) { splitEdges(comm_spec); } } inline size_t GetEdgeNum() const override { size_t res = this->directed_ ? oe_.head_edge_num() + ie_.head_edge_num() : oe_.head_edge_num() + is_selfloops_.count(); return res; } inline size_t GetOutgoingEdgeNum() const { return oe_.head_edge_num() + is_selfloops_.count(); } inline size_t GetIncomingEdgeNum() const { return ie_.head_edge_num() + is_selfloops_.count(); } using base_t::InnerVertices; using base_t::IsInnerVertex; using base_t::OuterVertices; vid_t GetVerticesNum() const { return alive_ivnum_ + alive_ovnum_; } vid_t GetInnerVerticesNum() const { return alive_ivnum_; } vid_t GetOuterVerticesNum() const { return alive_ovnum_; } inline const vdata_t& GetData(const vertex_t& v) const override { CHECK(IsInnerVertex(v)); return ivdata_[v.GetValue()]; } inline void SetData(const vertex_t& v, const vdata_t& val) override { CHECK(IsInnerVertex(v)); ivdata_[v.GetValue()] = val; } bool OuterVertexGid2Lid(vid_t gid, vid_t& lid) const override { auto iter = ovg2i_.find(gid); if (iter != ovg2i_.end()) { lid = iter->second; return true; } else { return false; } } vid_t GetOuterVertexGid(vertex_t v) const override { return ovgid_[outerVertexLidToIndex(v.GetValue())]; } bool IsOuterVertexGid(vid_t gid) const { return ovg2i_.find(gid) != ovg2i_.end(); } inline bool Gid2Vertex(const vid_t& gid, vertex_t& v) const override { fid_t fid = id_parser_.get_fragment_id(gid); if (fid == fid_) { v.SetValue(id_parser_.get_local_id(gid)); return true; } else { auto iter = ovg2i_.find(gid); if (iter != ovg2i_.end()) { v.SetValue(iter->second); return true; } else { return false; } } } inline vid_t Vertex2Gid(const vertex_t& v) const override { if (IsInnerVertex(v)) { return id_parser_.generate_global_id(fid_, v.GetValue()); } else { return ovgid_[outerVertexLidToIndex(v.GetValue())]; } } void ClearGraph(std::shared_ptr<vertex_map_t> vm_ptr) { vm_ptr_.reset(); vm_ptr_ = vm_ptr; Init(fid_, directed_); } void ClearEdges() { if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { ie_.clear_edges(); } oe_.clear_edges(); // clear outer_vertices map ovgid_.clear(); ovg2i_.clear(); ov_alive_.clear(); this->ovnum_ = 0; this->alive_ovnum_ = 0; is_selfloops_.clear(); } void CopyFrom(std::shared_ptr<DynamicFragment> source, const std::string& copy_type = "identical") { init(source->fid_, source->directed_); load_strategy_ = source->load_strategy_; copyVertices(source); // copy edges auto vnum = id_parser_.max_local_id(); ie_.init_head_and_tail(0, vnum); oe_.init_head_and_tail(0, vnum); ie_.add_vertices(ivnum_, ovnum_); oe_.add_vertices(ivnum_, ovnum_); if (copy_type == "identical") { std::vector<int> inner_oe_degree_to_add(ivnum_, 0), inner_ie_degree_to_add(ivnum_, 0), outer_oe_degree_to_add(ovnum_, 0), outer_ie_degree_to_add(ovnum_, 0); for (vid_t i = 0; i < ivnum_; ++i) { inner_oe_degree_to_add[i] = source->oe_.degree(i); inner_ie_degree_to_add[i] = source->ie_.degree(i); } for (vid_t i = 0; i < ovnum_; ++i) { outer_oe_degree_to_add[i] = source->oe_.degree(outerVertexIndexToLid(i)); outer_ie_degree_to_add[i] = source->ie_.degree(outerVertexIndexToLid(i)); } oe_.reserve_edges_dense(inner_oe_degree_to_add, outer_oe_degree_to_add); ie_.reserve_edges_dense(inner_ie_degree_to_add, outer_ie_degree_to_add); for (vid_t i = 0; i < ivnum_; ++i) { auto ie_begin = source->ie_.get_begin(i); auto ie_end = source->ie_.get_end(i); auto oe_begin = source->oe_.get_begin(i); auto oe_end = source->oe_.get_end(i); for (auto iter = ie_begin; iter != ie_end; ++iter) { ie_.put_edge(i, *iter); } for (auto iter = oe_begin; iter != oe_end; ++iter) { oe_.put_edge(i, *iter); } } for (vid_t i = outerVertexIndexToLid(ovnum_ - 1); i < vnum; ++i) { auto ie_begin = source->ie_.get_begin(i); auto ie_end = source->ie_.get_end(i); auto oe_begin = source->oe_.get_begin(i); auto oe_end = source->oe_.get_end(i); for (auto iter = ie_begin; iter != ie_end; ++iter) { ie_.put_edge(i, *iter); } for (auto iter = oe_begin; iter != oe_end; ++iter) { oe_.put_edge(i, *iter); } } } else if (copy_type == "reverse") { assert(directed_); std::vector<int> inner_oe_degree_to_add(ivnum_, 0), inner_ie_degree_to_add(ivnum_, 0), outer_oe_degree_to_add(ovnum_, 0), outer_ie_degree_to_add(ovnum_, 0); for (vid_t i = 0; i < ivnum_; ++i) { inner_oe_degree_to_add[i] = source->ie_.degree(i); inner_ie_degree_to_add[i] = source->oe_.degree(i); } for (vid_t i = 0; i < ovnum_; ++i) { outer_oe_degree_to_add[i] = source->ie_.degree(outerVertexIndexToLid(i)); outer_ie_degree_to_add[i] = source->oe_.degree(outerVertexIndexToLid(i)); } oe_.reserve_edges_dense(inner_oe_degree_to_add, outer_oe_degree_to_add); ie_.reserve_edges_dense(inner_ie_degree_to_add, outer_ie_degree_to_add); for (vid_t i = 0; i < ivnum_; ++i) { auto ie_begin = source->ie_.get_begin(i); auto ie_end = source->ie_.get_end(i); auto oe_begin = source->oe_.get_begin(i); auto oe_end = source->oe_.get_end(i); for (auto iter = oe_begin; iter != oe_end; ++iter) { ie_.put_edge(i, *iter); } for (auto iter = ie_begin; iter != ie_end; ++iter) { oe_.put_edge(i, *iter); } } for (vid_t i = outerVertexIndexToLid(ovnum_ - 1); i < vnum; ++i) { auto ie_begin = source->ie_.get_begin(i); auto ie_end = source->ie_.get_end(i); auto oe_begin = source->oe_.get_begin(i); auto oe_end = source->oe_.get_end(i); for (auto iter = oe_begin; iter != oe_end; ++iter) { ie_.put_edge(i, *iter); } for (auto iter = ie_begin; iter != ie_end; ++iter) { oe_.put_edge(i, *iter); } } } else { LOG(ERROR) << "Unsupported copy type: " << copy_type; } this->schema_.CopyFrom(source->schema_); } // generate directed graph from original undirected graph. void ToDirectedFrom(std::shared_ptr<DynamicFragment> source) { assert(!source->directed_); init(source->fid_, true); load_strategy_ = grape::LoadStrategy::kBothOutIn; copyVertices(source); // both inner and outer vertices with the empty slots auto vnum = id_parser_.max_local_id(); ie_.init_head_and_tail(0, vnum); oe_.init_head_and_tail(0, vnum); ie_.add_vertices(ivnum_, ovnum_); oe_.add_vertices(ivnum_, ovnum_); std::vector<int> inner_degree_to_add(ivnum_, 0), outer_degree_to_add(ovnum_, 0); for (vid_t i = 0; i < ivnum_; ++i) { inner_degree_to_add[i] = source->oe_.degree(i); } for (vid_t i = 0; i < ovnum_; ++i) { outer_degree_to_add[i] = source->oe_.degree(outerVertexIndexToLid(i)); } ie_.reserve_edges_dense(inner_degree_to_add, outer_degree_to_add); oe_.reserve_edges_dense(inner_degree_to_add, outer_degree_to_add); for (vid_t i = 0; i < ivnum_; ++i) { auto begin = source->oe_.get_begin(i); auto end = source->oe_.get_end(i); for (auto iter = begin; iter != end; ++iter) { ie_.put_edge(i, *iter); oe_.put_edge(i, *iter); } } this->schema_.CopyFrom(source->schema_); } // generate undirected graph from original directed graph. void ToUndirectedFrom(std::shared_ptr<DynamicFragment> source) { assert(source->directed_); init(source->fid_, false); load_strategy_ = grape::LoadStrategy::kOnlyOut; copyVertices(source); // both inner and outer vertices with the empty slots // only use oe_ in the undirected graph auto vnum = id_parser_.max_local_id(); oe_.init_head_and_tail(0, vnum); oe_.add_vertices(ivnum_, ovnum_); mutation_t mutation; vid_t gid; for (auto& v : source->InnerVertices()) { gid = Vertex2Gid(v); for (const auto& e : source->GetOutgoingAdjList(v)) { mutation.edges_to_add.emplace_back(gid, Vertex2Gid(e.neighbor), e.data); } for (const auto& e : source->GetIncomingAdjList(v)) { if (IsOuterVertex(e.neighbor)) { mutation.edges_to_add.emplace_back(gid, Vertex2Gid(e.neighbor), e.data); } } } Mutate(mutation); this->schema_.CopyFrom(source->schema_); } // induce a subgraph that contains the induced_vertices and the edges between // those vertices or a edge subgraph that contains the induced_edges and the // nodes incident to induced_edges. void InduceSubgraph( std::shared_ptr<DynamicFragment> source, const std::vector<oid_t>& induced_vertices, const std::vector<std::pair<oid_t, oid_t>>& induced_edges) { Init(source->fid_, source->directed_); mutation_t mutation; if (induced_edges.empty()) { induceFromVertices(source, induced_vertices, mutation.edges_to_add); } else { induceFromEdges(source, induced_edges, mutation.edges_to_add); } Mutate(mutation); } inline bool Oid2Gid(const oid_t& oid, vid_t& gid) const { return vm_ptr_->_GetGid(oid, gid); } inline size_t selfloops_num() const { return is_selfloops_.count(); } inline bool HasNode(const oid_t& node) const { vid_t gid; return this->vm_ptr_->_GetGid(fid_, node, gid) && iv_alive_.get_bit(id_parser_.get_local_id(gid)); } inline bool HasEdge(const oid_t& u, const oid_t& v) const { vid_t uid, vid; if (vm_ptr_->_GetGid(u, uid) && vm_ptr_->_GetGid(v, vid)) { vid_t ulid, vlid; if (IsInnerVertexGid(uid) && InnerVertexGid2Lid(uid, ulid) && Gid2Lid(vid, vlid) && iv_alive_.get_bit(ulid)) { auto iter = oe_.binary_find(ulid, vlid); if (iter != oe_.get_end(ulid)) { return true; } } else if (IsInnerVertexGid(vid) && InnerVertexGid2Lid(vid, vlid) && Gid2Lid(uid, ulid) && iv_alive_.get_bit(vlid)) { auto iter = directed_ ? ie_.binary_find(vlid, ulid) : oe_.binary_find(vlid, ulid); auto end = directed_ ? ie_.get_end(vlid) : oe_.get_end(vlid); if (iter != end) { return true; } } } return false; } inline bool GetEdgeData(const oid_t& u_oid, const oid_t& v_oid, edata_t& data) const { vid_t uid, vid; if (vm_ptr_->_GetGid(u_oid, uid) && vm_ptr_->_GetGid(v_oid, vid)) { vid_t ulid, vlid; if (IsInnerVertexGid(uid) && InnerVertexGid2Lid(uid, ulid) && Gid2Lid(vid, vlid) && iv_alive_.get_bit(ulid)) { auto iter = oe_.binary_find(ulid, vlid); if (iter != oe_.get_end(ulid)) { data = iter->data; return true; } } else if (IsInnerVertexGid(vid) && InnerVertexGid2Lid(vid, vlid) && Gid2Lid(uid, ulid) && iv_alive_.get_bit(vlid)) { auto iter = directed_ ? ie_.binary_find(vlid, ulid) : oe_.binary_find(vlid, ulid); auto end = directed_ ? ie_.get_end(vlid) : oe_.get_end(vlid); if (iter != end) { data = iter->data; return true; } } } return false; } inline bool IsAliveInnerVertex(const vertex_t& v) const { return iv_alive_.get_bit(v.GetValue()); } inline bool IsAliveVertex(const vertex_t& v) const { return IsInnerVertex(v) ? iv_alive_.get_bit(v.GetValue()) : ov_alive_.get_bit(outerVertexLidToIndex(v.GetValue())); } const dynamic::Value& GetSchema() { return schema_; } public: using base_t::GetOutgoingAdjList; inline adj_list_t GetIncomingAdjList(const vertex_t& v) override { if (!this->directed_) { return adj_list_t(oe_.get_begin(v.GetValue()), oe_.get_end(v.GetValue())); } return adj_list_t(ie_.get_begin(v.GetValue()), ie_.get_end(v.GetValue())); } inline const_adj_list_t GetIncomingAdjList(const vertex_t& v) const override { if (!this->directed_) { return const_adj_list_t(oe_.get_begin(v.GetValue()), oe_.get_end(v.GetValue())); } return const_adj_list_t(ie_.get_begin(v.GetValue()), ie_.get_end(v.GetValue())); } fragment_adj_list_t GetOutgoingAdjList(const vertex_t& v, fid_t dst_fid) override { return fragment_adj_list_t( get_oe_begin(v), get_oe_end(v), [this, dst_fid](const nbr_t& nbr) { return this->GetFragId(nbr.get_neighbor()) == dst_fid; }); } fragment_const_adj_list_t GetOutgoingAdjList(const vertex_t& v, fid_t dst_fid) const override { return fragment_const_adj_list_t( get_oe_begin(v), get_oe_end(v), [this, dst_fid](const nbr_t& nbr) { return this->GetFragId(nbr.get_neighbor()) == dst_fid; }); } fragment_adj_list_t GetIncomingAdjList(const vertex_t& v, fid_t dst_fid) override { if (!this->directed_) { return fragment_adj_list_t( get_oe_begin(v), get_oe_end(v), [this, dst_fid](const nbr_t& nbr) { return this->GetFragId(nbr.get_neighbor()) == dst_fid; }); } return fragment_adj_list_t( get_ie_begin(v), get_ie_end(v), [this, dst_fid](const nbr_t& nbr) { return this->GetFragId(nbr.get_neighbor()) == dst_fid; }); } fragment_const_adj_list_t GetIncomingAdjList(const vertex_t& v, fid_t dst_fid) const override { if (!this->directed_) { return fragment_const_adj_list_t( get_oe_begin(v), get_oe_end(v), [this, dst_fid](const nbr_t& nbr) { return this->GetFragId(nbr.get_neighbor()) == dst_fid; }); } return fragment_const_adj_list_t( get_ie_begin(v), get_ie_end(v), [this, dst_fid](const nbr_t& nbr) { return this->GetFragId(nbr.get_neighbor()) == dst_fid; }); } public: using base_t::get_ie_begin; using base_t::get_ie_end; using base_t::get_oe_begin; using base_t::get_oe_end; public: using adj_list_t = typename base_t::adj_list_t; using const_adj_list_t = typename base_t::const_adj_list_t; inline adj_list_t GetIncomingInnerVertexAdjList(const vertex_t& v) override { assert(IsInnerVertex(v)); return adj_list_t(get_ie_begin(v), iespliter_[v]); } inline const_adj_list_t GetIncomingInnerVertexAdjList( const vertex_t& v) const override { assert(IsInnerVertex(v)); return const_adj_list_t(get_ie_begin(v), iespliter_[v]); } inline adj_list_t GetIncomingOuterVertexAdjList(const vertex_t& v) override { assert(IsInnerVertex(v)); return adj_list_t(iespliter_[v], get_ie_end(v)); } inline const_adj_list_t GetIncomingOuterVertexAdjList( const vertex_t& v) const override { assert(IsInnerVertex(v)); return const_adj_list_t(iespliter_[v], get_ie_end(v)); } inline adj_list_t GetOutgoingInnerVertexAdjList(const vertex_t& v) override { assert(IsInnerVertex(v)); return adj_list_t(get_oe_begin(v), oespliter_[v]); } inline const_adj_list_t GetOutgoingInnerVertexAdjList( const vertex_t& v) const override { assert(IsInnerVertex(v)); return const_adj_list_t(get_oe_begin(v), oespliter_[v]); } inline adj_list_t GetOutgoingOuterVertexAdjList(const vertex_t& v) override { assert(IsInnerVertex(v)); return adj_list_t(oespliter_[v], get_oe_end(v)); } inline const_adj_list_t GetOutgoingOuterVertexAdjList( const vertex_t& v) const override { assert(IsInnerVertex(v)); return const_adj_list_t(oespliter_[v], get_oe_end(v)); } private: inline vid_t outerVertexLidToIndex(vid_t lid) const { return id_parser_.max_local_id() - lid - 1; } inline vid_t outerVertexIndexToLid(vid_t index) const { return id_parser_.max_local_id() - index - 1; } void splitEdges(const grape::CommSpec& comm_spec) { auto& inner_vertices = InnerVertices(); iespliter_.Init(inner_vertices); oespliter_.Init(inner_vertices); int concurrency = (std::thread::hardware_concurrency() + comm_spec.local_num() - 1) / comm_spec.local_num(); vineyard::parallel_for( static_cast<vid_t>(0), static_cast<vid_t>(inner_vertices.size()), [this, &inner_vertices](const vid_t& offset) { vertex_t v = *(inner_vertices.begin() + offset); size_t inner_neighbor_count = 0; auto ie = GetIncomingAdjList(v); for (auto& e : ie) { if (IsInnerVertex(e.neighbor)) { ++inner_neighbor_count; } } iespliter_[v] = get_ie_begin(v) + inner_neighbor_count; inner_neighbor_count = 0; auto oe = GetOutgoingAdjList(v); for (auto& e : oe) { if (IsInnerVertex(e.neighbor)) { ++inner_neighbor_count; } } oespliter_[v] = get_oe_begin(v) + inner_neighbor_count; }, concurrency, 1024); } vid_t parseOrAddOuterVertexGid(vid_t gid) { auto iter = ovg2i_.find(gid); if (iter != ovg2i_.end()) { return iter->second; } else { ++ovnum_; vid_t lid = id_parser_.max_local_id() - ovnum_; ovgid_.push_back(gid); ovg2i_.emplace(gid, lid); return lid; } } vid_t parseOuterVertexGid(vid_t gid) { auto iter = ovg2i_.find(gid); if (iter != ovg2i_.end()) { return iter->second; } else { assert(false); return (-1); } } void initOuterVerticesOfFragment() { outer_vertices_of_frag_.resize(fnum_); for (auto& vec : outer_vertices_of_frag_) { vec.clear(); } for (vid_t i = 0; i < ovnum_; ++i) { fid_t fid = id_parser_.get_fragment_id(ovgid_[i]); outer_vertices_of_frag_[fid].push_back( vertex_t(outerVertexIndexToLid(i))); } } void addEdges(std::vector<edge_t>& edges) { double rate = 0; if (directed_) { rate = static_cast<double>(edges.size()) / static_cast<double>(oe_.edge_num()); } else { rate = 2.0 * static_cast<double>(edges.size()) / static_cast<double>(oe_.edge_num()); } if (rate < dense_threshold) { addEdgesSparse(edges); } else { addEdgesDense(edges); } } void addEdgesDense(std::vector<edge_t>& edges) { static constexpr vid_t invalid_vid = std::numeric_limits<vid_t>::max(); if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { std::vector<int> inner_oe_degree_to_add(ivnum_, 0), inner_ie_degree_to_add(ivnum_, 0), outer_oe_degree_to_add(ovnum_, 0), outer_ie_degree_to_add(ovnum_, 0); // reserve edges for (auto& e : edges) { if (e.src == invalid_vid) { continue; } if (e.src < ivnum_) { ++inner_oe_degree_to_add[e.src]; } else { ++outer_oe_degree_to_add[outerVertexLidToIndex(e.src)]; } if (e.dst < ivnum_) { ++inner_ie_degree_to_add[e.dst]; } else { ++outer_ie_degree_to_add[outerVertexLidToIndex(e.dst)]; } } oe_.reserve_edges_dense(inner_oe_degree_to_add, outer_oe_degree_to_add); ie_.reserve_edges_dense(inner_ie_degree_to_add, outer_ie_degree_to_add); // add edges std::fill(inner_oe_degree_to_add.begin(), inner_oe_degree_to_add.end(), 0); std::fill(outer_oe_degree_to_add.begin(), outer_oe_degree_to_add.end(), 0); std::fill(inner_ie_degree_to_add.begin(), inner_ie_degree_to_add.end(), 0); std::fill(outer_ie_degree_to_add.begin(), outer_ie_degree_to_add.end(), 0); for (auto& e : edges) { if (e.src == invalid_vid) { continue; } if (updateOrAddEdgeOutIn(e)) { if (e.src < ivnum_) { ++inner_oe_degree_to_add[e.src]; } else { ++outer_oe_degree_to_add[outerVertexLidToIndex(e.src)]; } if (e.dst < ivnum_) { ++inner_ie_degree_to_add[e.dst]; } else { ++outer_ie_degree_to_add[outerVertexLidToIndex(e.dst)]; } } } oe_.sort_neighbors_dense(inner_oe_degree_to_add, outer_oe_degree_to_add); ie_.sort_neighbors_dense(inner_ie_degree_to_add, outer_ie_degree_to_add); } else { std::vector<int> inner_oe_degree_to_add(ivnum_, 0), outer_oe_degree_to_add(ovnum_, 0); // reserve edges for (auto& e : edges) { if (e.src == invalid_vid) { continue; } assert(!(e.src >= ivnum_ && e.dst >= ivnum_)); if (e.src < ivnum_) { ++inner_oe_degree_to_add[e.src]; } else { ++outer_oe_degree_to_add[outerVertexLidToIndex(e.src)]; } if (e.dst < ivnum_) { ++inner_oe_degree_to_add[e.dst]; } else { ++outer_oe_degree_to_add[outerVertexLidToIndex(e.dst)]; } } oe_.reserve_edges_dense(inner_oe_degree_to_add, outer_oe_degree_to_add); // add edges std::fill(inner_oe_degree_to_add.begin(), inner_oe_degree_to_add.end(), 0); std::fill(outer_oe_degree_to_add.begin(), outer_oe_degree_to_add.end(), 0); for (auto& e : edges) { if (e.src == invalid_vid) { continue; } if (updateOrAddEdgeOut(e)) { if (e.src < ivnum_) { ++inner_oe_degree_to_add[e.src]; } else { ++outer_oe_degree_to_add[outerVertexLidToIndex(e.src)]; } if (e.dst < ivnum_ && e.src != e.dst) { ++inner_oe_degree_to_add[e.dst]; } else if (e.src != e.dst) { ++outer_oe_degree_to_add[outerVertexLidToIndex(e.dst)]; } } } oe_.sort_neighbors_dense(inner_oe_degree_to_add, outer_oe_degree_to_add); } } void addEdgesSparse(std::vector<edge_t>& edges) { static constexpr vid_t invalid_vid = std::numeric_limits<vid_t>::max(); if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { std::map<vid_t, int> oe_degree_to_add, ie_degree_to_add; // reserve edges for (auto& e : edges) { if (e.src == invalid_vid) { continue; } ++oe_degree_to_add[e.src]; ++ie_degree_to_add[e.dst]; } oe_.reserve_edges_sparse(oe_degree_to_add); ie_.reserve_edges_sparse(ie_degree_to_add); // add edges oe_degree_to_add.clear(); ie_degree_to_add.clear(); for (auto& e : edges) { if (e.src == invalid_vid) { continue; } if (updateOrAddEdgeOutIn(e)) { ++oe_degree_to_add[e.src]; ++ie_degree_to_add[e.dst]; } } oe_.sort_neighbors_sparse(oe_degree_to_add); ie_.sort_neighbors_sparse(ie_degree_to_add); } else { std::map<vid_t, int> oe_degree_to_add; // reserve edges for (auto& e : edges) { if (e.src == invalid_vid) { continue; } ++oe_degree_to_add[e.src]; ++oe_degree_to_add[e.dst]; } oe_.reserve_edges_sparse(oe_degree_to_add); // add edges oe_degree_to_add.clear(); for (auto& e : edges) { if (e.src == invalid_vid) { continue; } if (updateOrAddEdgeOut(e)) { ++oe_degree_to_add[e.src]; if (e.src != e.dst) { ++oe_degree_to_add[e.dst]; } } } oe_.sort_neighbors_sparse(oe_degree_to_add); } } // Return true if add a new edge, otherwise false. bool updateOrAddEdgeOut(const edge_t& e) { bool ret = false; // assume it just update existed edge. { auto iter = oe_.find(e.src, e.dst); if (iter == oe_.get_end(e.src)) { oe_.put_edge(e.src, nbr_t(e.dst, e.edata)); ret = true; } else { iter->data.Update(e.edata); } if (ret && e.src == e.dst) { is_selfloops_.set_bit(e.src); return ret; } } { auto iter = oe_.find(e.dst, e.src); if (iter == oe_.get_end(e.dst)) { oe_.put_edge(e.dst, nbr_t(e.src, e.edata)); ret = true; } else { iter->data.Update(e.edata); } } return ret; } // Return true if add a new edge, otherwise false. bool updateOrAddEdgeOutIn(const edge_t& e) { bool ret = false; // assume it just update existed edge. { auto iter = oe_.find(e.src, e.dst); if (iter == oe_.get_end(e.src)) { oe_.put_edge(e.src, nbr_t(e.dst, e.edata)); ret = true; } else { iter->data.Update(e.edata); } if (ret && e.src == e.dst) { is_selfloops_.set_bit(e.src); } } { auto iter = ie_.find(e.dst, e.src); if (iter == ie_.get_end(e.dst)) { ie_.put_edge(e.dst, nbr_t(e.src, e.edata)); ret = true; } else { iter->data.Update(e.edata); } } return ret; } void removeEdges(std::vector<std::pair<vid_t, vid_t>>& edges) { for (auto& e : edges) { if (!(Gid2Lid(e.first, e.first) && Gid2Lid(e.second, e.second))) { continue; } if (e.first == e.second) { this->is_selfloops_.reset_bit(e.first); } } oe_.remove_edges(edges); ie_.remove_reversed_edges(edges); if (!directed_) { oe_.remove_reversed_edges(edges); } } void updateEdges(std::vector<edge_t>& edges) { oe_.update_edges(edges); if (directed_) { ie_.update_reversed_edges(edges); } else { oe_.update_reversed_edges(edges); } } void copyVertices(std::shared_ptr<DynamicFragment>& source) { this->ivnum_ = source->ivnum_; this->ovnum_ = source->ovnum_; this->alive_ivnum_ = source->alive_ivnum_; this->alive_ovnum_ = source->alive_ovnum_; this->fnum_ = source->fnum_; this->iv_alive_.copy(source->iv_alive_); this->ov_alive_.copy(source->ov_alive_); this->is_selfloops_.copy(source->is_selfloops_); ovg2i_ = source->ovg2i_; ovgid_.resize(ovnum_); memcpy(&ovgid_[0], &(source->ovgid_[0]), ovnum_ * sizeof(vid_t)); ivdata_.clear(); ivdata_.resize(ivnum_); for (size_t i = 0; i < ivnum_; ++i) { ivdata_[i] = source->ivdata_[i]; } this->inner_vertices_.SetRange(0, ivnum_); this->outer_vertices_.SetRange(id_parser_.max_local_id() - ovnum_, id_parser_.max_local_id()); this->vertices_.SetRange(0, ivnum_, id_parser_.max_local_id() - ovnum_, id_parser_.max_local_id()); } // induce subgraph from induced_nodes void induceFromVertices(std::shared_ptr<DynamicFragment>& source, const std::vector<oid_t>& induced_vertices, std::vector<edge_t>& edges) { vertex_t vertex; vid_t gid, dst_gid; for (const auto& oid : induced_vertices) { if (source->GetVertex(oid, vertex)) { if (source->IsInnerVertex(vertex)) { // store the vertex data CHECK(vm_ptr_->_GetGid(fid_, oid, gid)); auto lid = id_parser_.get_local_id(gid); ivdata_[lid] = source->GetData(vertex); } else { continue; // ignore outer vertex. } for (const auto& e : source->GetOutgoingAdjList(vertex)) { auto dst_oid = source->GetId(e.get_neighbor()); if (std::find(induced_vertices.begin(), induced_vertices.end(), dst_oid) != induced_vertices.end()) { CHECK(Oid2Gid(dst_oid, dst_gid)); edges.emplace_back(gid, dst_gid, e.get_data()); } } if (directed_) { // filter the cross-fragment incoming edges for (const auto& e : source->GetIncomingAdjList(vertex)) { if (source->IsOuterVertex(e.get_neighbor())) { auto dst_oid = source->GetId(e.get_neighbor()); if (std::find(induced_vertices.begin(), induced_vertices.end(), dst_oid) != induced_vertices.end()) { CHECK(Oid2Gid(dst_oid, dst_gid)); edges.emplace_back(dst_gid, gid, e.get_data()); } } } } } } } // induce edge_subgraph from induced_edges void induceFromEdges( std::shared_ptr<DynamicFragment>& source, const std::vector<std::pair<oid_t, oid_t>>& induced_edges, std::vector<edge_t>& edges) { vertex_t vertex; vid_t gid, dst_gid; edata_t edata; for (auto& e : induced_edges) { const auto& src_oid = e.first; const auto& dst_oid = e.second; if (source->HasEdge(src_oid, dst_oid)) { if (vm_ptr_->_GetGid(fid_, src_oid, gid)) { // src is inner vertex auto lid = id_parser_.get_local_id(gid); CHECK(source->GetVertex(src_oid, vertex)); ivdata_[lid] = source->GetData(vertex); CHECK(vm_ptr_->_GetGid(dst_oid, dst_gid)); CHECK(source->GetEdgeData(src_oid, dst_oid, edata)); edges.emplace_back(gid, dst_gid, edata); if (gid != dst_gid && id_parser_.get_fragment_id(dst_gid) == fid_) { // dst is inner vertex too CHECK(source->GetVertex(dst_oid, vertex)); ivdata_[id_parser_.get_local_id(dst_gid)] = source->GetData(vertex); } } else if (vm_ptr_->_GetGid(fid_, dst_oid, dst_gid)) { // dst is inner vertex but src is outer vertex CHECK(source->GetVertex(dst_oid, vertex)); ivdata_[id_parser_.get_local_id(dst_gid)] = source->GetData(vertex); CHECK(vm_ptr_->_GetGid(src_oid, gid)); source->GetEdgeData(src_oid, dst_oid, edata); if (directed_) { edges.emplace_back(gid, dst_gid, edata); } else { edges.emplace_back(dst_gid, gid, edata); } } } } } void initVertexMembersOfFragment() { alive_ivnum_ = ivnum_; alive_ovnum_ = ovnum_; iv_alive_.init(ivnum_); ov_alive_.init(ovnum_); for (size_t i = 0; i < ivnum_; i++) { iv_alive_.set_bit(i); } for (size_t i = 0; i < ovnum_; i++) { ov_alive_.set_bit(i); } is_selfloops_.init(ivnum_); this->inner_vertices_.SetRange(0, ivnum_); this->outer_vertices_.SetRange(id_parser_.max_local_id() - ovnum_, id_parser_.max_local_id()); this->vertices_.SetRange(0, ivnum_, id_parser_.max_local_id() - ovnum_, id_parser_.max_local_id()); } void buildCSRParallel(std::vector<std::vector<edge_t>>& edges, const std::vector<int>& inner_oe_degree, const std::vector<int>& outer_oe_degree, const std::vector<int>& inner_ie_degree, const std::vector<int>& outer_ie_degree, uint32_t thread_num) { auto vnum = id_parser_.max_local_id(); ie_.init_head_and_tail(0, vnum); oe_.init_head_and_tail(0, vnum); oe_.add_vertices(ivnum_, ovnum_); ie_.add_vertices(ivnum_, ovnum_); // parse edges, global id to local id parallel_for( edges.begin(), edges.end(), [&](uint32_t tid, std::vector<edge_t>& es) { if (load_strategy_ == grape::LoadStrategy::kOnlyOut) { for (auto& e : es) { CHECK(InnerVertexGid2Lid(e.src, e.src)); CHECK(Gid2Lid(e.dst, e.dst)); } } else { for (auto& e : es) { CHECK(Gid2Lid(e.src, e.src)); CHECK(Gid2Lid(e.dst, e.dst)); } } }, thread_num, 1); // insert the edges insertEdgesParallel(edges, inner_oe_degree, outer_oe_degree, inner_ie_degree, outer_ie_degree, thread_num); } void insertEdgesParallel(std::vector<std::vector<edge_t>>& edges, const std::vector<int>& inner_oe_degree, const std::vector<int>& outer_oe_degree, const std::vector<int>& inner_ie_degree, const std::vector<int>& outer_ie_degree, uint32_t thread_num) { auto insert_edges_out_in = [&](uint32_t tid, std::vector<edge_t>& es) { dynamic::Value tmp_data; // avoid to use default allocator on parallel for (auto& e : es) { if (e.src < ivnum_) { if (e.dst < ivnum_) { tmp_data.CopyFrom(e.edata, (*allocators_)[tid]); nbr_t nbr(e.dst, std::move(tmp_data)); oe_.put_edge(e.src, std::move(nbr)); } else { // avoid copy nbr_t nbr(e.dst, std::move(e.edata)); oe_.put_edge(e.src, std::move(nbr)); } } else { nbr_t nbr(e.src, std::move(e.edata)); ie_.put_edge(e.dst, std::move(nbr)); } } }; auto insert_edges_out = [&](uint32_t tid, std::vector<edge_t>& es) { for (auto& e : es) { nbr_t nbr(e.dst, std::move(e.edata)); oe_.put_edge(e.src, std::move(nbr)); } }; oe_.reserve_edges_dense(inner_oe_degree, outer_oe_degree); if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { ie_.reserve_edges_dense(inner_ie_degree, outer_ie_degree); parallel_for(edges.begin(), edges.end(), insert_edges_out_in, thread_num, 1); // The incoming edges may not store in the same thread vector, // can't be parallel process. for (auto& vec : edges) { for (auto& e : vec) { if (e.src < ivnum_ && e.dst < ivnum_) { nbr_t nbr(e.src, std::move(e.edata)); ie_.put_edge(e.dst, std::move(nbr)); } } } ie_.sort_neighbors_dense(inner_ie_degree, outer_ie_degree); } else { parallel_for(edges.begin(), edges.end(), insert_edges_out, thread_num, 1); } oe_.sort_neighbors_dense(inner_oe_degree, outer_oe_degree); } void initSchema() { schema_.SetObject(); schema_.Insert("vertex", dynamic::Value(rapidjson::kObjectType)); schema_.Insert("edge", dynamic::Value(rapidjson::kObjectType)); } private: using base_t::ivnum_; vid_t ovnum_; vid_t alive_ivnum_, alive_ovnum_; using base_t::directed_; using base_t::fid_; using base_t::fnum_; using base_t::id_parser_; grape::LoadStrategy load_strategy_; ska::flat_hash_map<vid_t, vid_t> ovg2i_; std::vector<vid_t> ovgid_; grape::Array<vdata_t, grape::Allocator<vdata_t>> ivdata_; grape::Bitset iv_alive_; grape::Bitset ov_alive_; grape::Bitset is_selfloops_; grape::VertexArray<inner_vertices_t, nbr_t*> iespliter_, oespliter_; // allocators for parallel convert std::shared_ptr<std::vector<dynamic::AllocatorT>> allocators_; dynamic::Value schema_; using base_t::outer_vertices_of_frag_; template <typename _vdata_t, typename _edata_t> friend class DynamicProjectedFragment; template <typename FRAG_T> friend class ArrowToDynamicConverter; friend class DynamicFragmentMutator; }; class DynamicFragmentMutator { using fragment_t = DynamicFragment; using vertex_map_t = typename fragment_t::vertex_map_t; using oid_t = typename fragment_t::oid_t; using vid_t = typename fragment_t::vid_t; using vdata_t = typename fragment_t::vdata_t; using edata_t = typename fragment_t::edata_t; using mutation_t = typename fragment_t::mutation_t; using partitioner_t = typename vertex_map_t::partitioner_t; public: explicit DynamicFragmentMutator(const grape::CommSpec& comm_spec, std::shared_ptr<fragment_t> fragment) : comm_spec_(comm_spec), fragment_(fragment), vm_ptr_(fragment->GetVertexMap()) { comm_spec_.Dup(); } ~DynamicFragmentMutator() = default; void ModifyVertices(dynamic::Value& vertices_to_modify, const dynamic::Value& common_attrs, const rpc::ModifyType& modify_type) { mutation_t mutation; auto& partitioner = vm_ptr_->GetPartitioner(); oid_t oid; vid_t gid; vdata_t v_data; fid_t v_fid, fid = fragment_->fid(); for (auto& v : vertices_to_modify) { v_data = common_attrs; // v could be [id, attrs] or id if (v.IsArray() && v.Size() == 2 && v[1].IsObject()) { oid = std::move(v[0]); v_data.Update(vdata_t(v[1])); } else { oid = std::move(v); } v_fid = partitioner.GetPartitionId(oid); if (modify_type == rpc::NX_ADD_NODES) { vm_ptr_->AddVertex(std::move(oid), gid); if (v_data.IsObject() && !v_data.GetObject().ObjectEmpty()) { for (const auto& prop : v_data.GetObject()) { if (!fragment_->schema_["vertex"].HasMember(prop.name)) { dynamic::Value key(prop.name); fragment_->schema_["vertex"].AddMember( key, dynamic::DynamicType2RpcType(dynamic::GetType(prop.value)), dynamic::Value::allocator_); } } } if (v_fid == fid) { mutation.vertices_to_add.emplace_back(gid, std::move(v_data)); } } else { // UPDATE or DELETE, if not exist the node, continue. if (!vm_ptr_->_GetGid(v_fid, oid, gid)) { continue; } } if (modify_type == rpc::NX_UPDATE_NODES && v_fid == fid) { mutation.vertices_to_update.emplace_back(gid, std::move(v_data)); } if (modify_type == rpc::NX_DEL_NODES && (v_fid == fid || fragment_->IsOuterVertexGid(gid))) { mutation.vertices_to_remove.emplace_back(gid); } } fragment_->Mutate(mutation); } void ModifyEdges(dynamic::Value& edges_to_modify, const dynamic::Value& common_attrs, const rpc::ModifyType modify_type, const std::string weight) { edata_t e_data; oid_t src, dst; vid_t src_gid, dst_gid, lid; fid_t src_fid, dst_fid, fid = fragment_->fid(); auto& partitioner = vm_ptr_->GetPartitioner(); mutation_t mutation; mutation.edges_to_add.reserve(edges_to_modify.Size()); mutation.vertices_to_add.reserve(edges_to_modify.Size() * 2); for (auto& e : edges_to_modify) { // the edge could be [src, dst] or [srs, dst, value] or [src, dst, // {"key": val}] e_data = common_attrs; if (e.Size() == 3) { if (weight.empty()) { e_data.Update(edata_t(e[2])); } else { e_data.Insert(weight, edata_t(e[2])); } } src = std::move(e[0]); dst = std::move(e[1]); src_fid = partitioner.GetPartitionId(src); dst_fid = partitioner.GetPartitionId(dst); if (modify_type == rpc::NX_ADD_EDGES) { bool src_new_add = vm_ptr_->AddVertex(std::move(src), src_gid); bool dst_new_add = vm_ptr_->AddVertex(std::move(dst), dst_gid); if (src_fid == fid) { fragment_->InnerVertexGid2Lid(src_gid, lid); if (src_new_add || (fragment_->iv_alive_.cardinality() > lid && !fragment_->iv_alive_.get_bit(lid))) { vdata_t empty_data(rapidjson::kObjectType); mutation.vertices_to_add.emplace_back(src_gid, std::move(empty_data)); } } if (dst_fid == fid) { fragment_->InnerVertexGid2Lid(dst_gid, lid); if (dst_new_add || (fragment_->iv_alive_.cardinality() > lid && !fragment_->iv_alive_.get_bit(lid))) { vdata_t empty_data(rapidjson::kObjectType); mutation.vertices_to_add.emplace_back(dst_gid, std::move(empty_data)); } } if (e_data.IsObject() && !e_data.GetObject().ObjectEmpty()) { for (const auto& prop : e_data.GetObject()) { if (!fragment_->schema_["edge"].HasMember(prop.name)) { dynamic::Value key(prop.name); fragment_->schema_["edge"].AddMember( key, dynamic::DynamicType2RpcType(dynamic::GetType(prop.value)), dynamic::Value::allocator_); } } } } else { if (!vm_ptr_->_GetGid(src_fid, src, src_gid) || !vm_ptr_->_GetGid(dst_fid, dst, dst_gid)) { continue; } } if (modify_type == rpc::NX_ADD_EDGES) { if (src_fid == fid || dst_fid == fid) { mutation.edges_to_add.emplace_back(src_gid, dst_gid, std::move(e_data)); } } else if (modify_type == rpc::NX_DEL_EDGES) { if (src_fid == fid || dst_fid == fid) { mutation.edges_to_remove.emplace_back(src_gid, dst_gid); } } else if (modify_type == rpc::NX_UPDATE_EDGES) { if (src_fid == fid || dst_fid == fid) { mutation.edges_to_update.emplace_back(src_gid, dst_gid, std::move(e_data)); } } } fragment_->Mutate(mutation); } private: grape::CommSpec comm_spec_; std::shared_ptr<fragment_t> fragment_; std::shared_ptr<vertex_map_t> vm_ptr_; }; } // namespace gs #endif // NETWORKX #endif // ANALYTICAL_ENGINE_CORE_FRAGMENT_DYNAMIC_FRAGMENT_H_