analytical_engine/apps/python_pie/wrapper.h (339 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_APPS_PYTHON_PIE_WRAPPER_H_ #define ANALYTICAL_ENGINE_APPS_PYTHON_PIE_WRAPPER_H_ #include <memory> #include <string> #include <unordered_map> #include <utility> #include <vector> #include "boost/lexical_cast.hpp" #include "vineyard/graph/fragment/arrow_fragment.h" #include "apps/python_pie/aggregate_factory.h" #include "core/app/property_auto_app_base.h" namespace gs { template <typename FRAG_T> class PIEAdjList; template <typename FRAG_T> class PythonPIEFragment { using fragment_t = FRAG_T; using oid_t = typename fragment_t::oid_t; using vid_t = typename fragment_t::vid_t; using eid_t = typename fragment_t::eid_t; using prop_id_t = typename fragment_t::prop_id_t; using label_id_t = typename fragment_t::label_id_t; using vertex_range_t = typename fragment_t::vertex_range_t; using vertex_t = typename fragment_t::vertex_t; using nbr_t = typename fragment_t::nbr_t; using vertex_map_t = typename fragment_t::vertex_map_t; using adj_list_t = PIEAdjList<fragment_t>; public: PythonPIEFragment() = default; ~PythonPIEFragment() {} fid_t fid() { return fragment_->fid(); } fid_t fnum() { return fragment_->fnum(); } label_id_t vertex_label_num() const { return fragment_->vertex_label_num(); } label_id_t vertex_label(const vertex_t& v) const { return fragment_->vertex_label(v); } int64_t vertex_offset(const vertex_t& v) const { return fragment_->vertex_offset(v); } label_id_t edge_label_num() const { return fragment_->edge_label_num(); } std::shared_ptr<arrow::Table> vertex_data_table(label_id_t i) const { return fragment_->vertex_data_table(i); } std::shared_ptr<arrow::Table> edge_data_table(label_id_t i) const { return fragment_->edge_data_table(i); } size_t get_total_nodes_num() const { return fragment_->GetTotalNodesNum(); } size_t get_inner_nodes_num(label_id_t label_id) const { return fragment_->GetInnerVerticesNum(label_id); } size_t get_outer_nodes_num(label_id_t label_id) const { return fragment_->GetOuterVerticesNum(label_id); } size_t get_nodes_num(label_id_t label_id) const { return nodes(label_id).size(); } vertex_range_t nodes(label_id_t label_id) const { return fragment_->Vertices(label_id); } vertex_range_t inner_nodes(label_id_t label_id) const { return fragment_->InnerVertices(label_id); } vertex_range_t outer_nodes(label_id_t label_id) const { return fragment_->OuterVertices(label_id); } int get_node_fid(const vertex_t& v) const { return fragment_->GetFragId(v); } bool is_inner_node(const vertex_t& v) { return fragment_->IsInnerVertex(v); } bool is_outer_node(const vertex_t& v) { return fragment_->IsOuterVertex(v); } bool get_node(label_id_t label, const std::string& oid, vertex_t& v) { return fragment_->GetVertex(label, boost::lexical_cast<oid_t>(oid), v); } bool get_inner_node(label_id_t label, const std::string& oid, vertex_t& v) { return fragment_->GetInnerVertex(label, boost::lexical_cast<oid_t>(oid), v); } bool get_outer_node(label_id_t label, const std::string& oid, vertex_t& v) { return fragment_->GetOuterVertex(label, boost::lexical_cast<oid_t>(oid), v); } bool get_node_by_gid(vid_t gid, vertex_t& v) { return fragment_->Gid2Vertex(gid, v); } std::string get_node_id(const vertex_t& v) const { return boost::lexical_cast<std::string>(fragment_->GetId(v)); } uint64_t get_inner_node_gid(const vertex_t& v) { return fragment_->GetInnerVertexGid(v); } uint64_t get_outer_node_gid(const vertex_t& v) { return fragment_->GetOuterVertexGid(v); } bool get_gid_by_oid(const std::string& oid, vid_t& gid) { return fragment_->Oid2Gid(boost::lexical_cast<oid_t>(oid), gid); } adj_list_t get_outgoing_edges(const vertex_t& v, label_id_t e_label) { return adj_list_t(fragment_->GetOutgoingAdjList(v, e_label)); } adj_list_t get_incoming_edges(const vertex_t& v, label_id_t e_label) { return adj_list_t(fragment_->GetIncomingAdjList(v, e_label)); } bool has_child(const vertex_t& v, label_id_t e_label) { return fragment_->HasChild(v, e_label); } bool has_parent(const vertex_t& v, label_id_t e_label) { return fragment_->HasParent(v, e_label); } int get_indegree(const vertex_t& v, label_id_t e_label) { return fragment_->GetLocalInDegree(v, e_label); } int get_outdegree(const vertex_t& v, label_id_t e_label) { return fragment_->GetLocalInDegree(v, e_label); } std::string get_str(const vertex_t& v, prop_id_t prop_id) const { return fragment_->template GetData<std::string>(v, prop_id); } double get_double(const vertex_t& v, prop_id_t prop_id) const { return fragment_->template GetData<double>(v, prop_id); } int64_t get_int(const vertex_t& v, prop_id_t prop_id) const { return fragment_->template GetData<int64_t>(v, prop_id); } bool Gid2Vertex(const vid_t& gid, vertex_t& v) const { return fragment_->Gid2Vertex(gid, v); } // schema prop_id_t vertex_property_num(label_id_t v_label_id) const { return fragment_->vertex_property_num(v_label_id); } prop_id_t vertex_property_num(const std::string& v_label) const { label_id_t v_label_id = get_vertex_label_id_by_name(v_label); return vertex_property_num(v_label_id); } prop_id_t edge_property_num(label_id_t e_label_id) const { return fragment_->edge_property_num(e_label_id); } prop_id_t edge_property_num(const std::string& e_label) const { label_id_t e_label_id = get_edge_label_id_by_name(e_label); return edge_property_num(e_label_id); } std::vector<std::string> vertex_labels() const { return fragment_->schema().GetVertexLabels(); } std::vector<std::string> edge_labels() const { return fragment_->schema().GetEdgeLabels(); } std::string get_vertex_label_by_id(label_id_t v_label_id) const { return fragment_->schema().GetVertexLabelName(v_label_id); } label_id_t get_vertex_label_id_by_name(const std::string& name) const { return fragment_->schema().GetVertexLabelId(name); } std::string get_edge_label_by_id(label_id_t e_label_id) const { return fragment_->schema().GetEdgeLabelName(e_label_id); } label_id_t get_edge_label_id_by_name(const std::string& name) const { return fragment_->schema().GetEdgeLabelId(name); } std::vector<std::pair<std::string, std::string>> vertex_properties( const std::string& label) const { return fragment_->schema().GetVertexPropertyListByLabel(label); } std::vector<std::pair<std::string, std::string>> vertex_properties( label_id_t label_id) const { return fragment_->schema().GetVertexPropertyListByLabel(label_id); } std::vector<std::pair<std::string, std::string>> edge_properties( const std::string& label) const { return fragment_->schema().GetEdgePropertyListByLabel(label); } std::vector<std::pair<std::string, std::string>> edge_properties( label_id_t label_id) const { return fragment_->schema().GetEdgePropertyListByLabel(label_id); } prop_id_t get_vertex_property_id_by_name(const std::string& v_label, const std::string& name) const { label_id_t v_label_id = fragment_->schema().GetVertexLabelId(v_label); return get_vertex_property_id_by_name(v_label_id, name); } prop_id_t get_vertex_property_id_by_name(label_id_t v_label_id, const std::string& name) const { return fragment_->schema().GetVertexPropertyId(v_label_id, name); } std::string get_vertex_property_by_id(const std::string& v_label, prop_id_t v_prop_id) const { label_id_t v_label_id = fragment_->schema().GetVertexLabelId(v_label); return get_vertex_property_by_id(v_label_id, v_prop_id); } std::string get_vertex_property_by_id(label_id_t v_label_id, prop_id_t v_prop_id) const { return fragment_->schema().GetVertexPropertyName(v_label_id, v_prop_id); } prop_id_t get_edge_property_id_by_name(const std::string& e_label, const std::string& name) const { label_id_t e_label_id = fragment_->schema().GetEdgeLabelId(e_label); return get_edge_property_id_by_name(e_label_id, name); } prop_id_t get_edge_property_id_by_name(label_id_t e_label_id, const std::string& name) const { return fragment_->schema().GetEdgePropertyId(e_label_id, name); } std::string get_edge_property_by_id(const std::string& e_label, prop_id_t e_prop_id) const { label_id_t e_label_id = fragment_->schema().GetEdgeLabelId(e_label); return get_edge_property_by_id(e_label_id, e_prop_id); } std::string get_edge_property_by_id(label_id_t e_label_id, prop_id_t e_prop_id) const { return fragment_->schema().GetEdgePropertyName(e_label_id, e_prop_id); } vid_t Vertex2Gid(const vertex_t& v) const { return fragment_->Vertex2Gid(v); } std::shared_ptr<vertex_map_t> GetVertexMap() { return fragment_->GetVertexMap(); } void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } private: const fragment_t* fragment_; }; template <typename FRAG_T, typename VD_T, typename MD_T> class PythonPIEComputeContext { using fragment_t = FRAG_T; using oid_t = typename fragment_t::oid_t; using vid_t = typename fragment_t::vid_t; using eid_t = typename fragment_t::eid_t; using prop_id_t = typename fragment_t::prop_id_t; using label_id_t = typename fragment_t::label_id_t; using vertex_range_t = typename fragment_t::vertex_range_t; using vertex_t = typename fragment_t::vertex_t; using nbr_t = typename fragment_t::nbr_t; using adj_list_t = typename fragment_t::adj_list_t; using vertex_map_t = typename fragment_t::vertex_map_t; public: // exposes to PregelContext using vd_t = VD_T; explicit PythonPIEComputeContext( std::vector<grape::VertexArray<typename FRAG_T::vertices_t, VD_T>>& data) : superstep_(0), data_(data) {} ~PythonPIEComputeContext() {} void init(const fragment_t& frag) { superstep_ = 0; auto v_label_num = frag.vertex_label_num(); for (label_id_t v_label = 0; v_label < v_label_num; v_label++) { partial_result_.emplace_back(data_[v_label]); } } void inc_superstep() { superstep_++; } int superstep() { return superstep_; } void set_config(const std::string& key, const std::string& value) { config_.emplace(key, value); } std::string get_config(const std::string& key) { auto iter = config_.find(key); if (iter != config_.end()) { return iter->second; } else { return ""; } } void set_node_value(vertex_t& v, VD_T value) { auto label = fragment_->vertex_label(v); partial_result_[label].SetValue(v, value); } VD_T get_node_value(const vertex_t& v) { auto label = fragment_->vertex_label(v); return partial_result_[label][v]; } void init_value(vertex_range_t vertices, label_id_t label, VD_T value, const std::function<bool(MD_T*, MD_T&&)>& aggregator) { partial_result_[label].Init(vertices, value, aggregator); } void init_value(vertex_range_t vertices, label_id_t label, VD_T value, PIEAggregateType type) { partial_result_[label].Init(vertices, value, AggregateFactory::CreateAggregate<MD_T>(type)); } bool is_updated(const vertex_t& v) { auto label = fragment_->vertex_label(v); return partial_result_[label].IsUpdated(v); } grape::SyncBuffer<typename FRAG_T::vertices_t, VD_T>& partial_result( label_id_t label) { return partial_result_[label]; } void register_sync_buffer(label_id_t label_id, grape::MessageStrategy message_strategy) { CHECK(fragment_); message_manager_->RegisterSyncBuffer( *fragment_, label_id, &partial_result_[label_id], message_strategy); } void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } void set_message_manager( PropertyAutoMessageManager<fragment_t>* message_manager) { message_manager_ = message_manager; } private: int superstep_; std::unordered_map<std::string, std::string> config_; const fragment_t* fragment_; PropertyAutoMessageManager<fragment_t>* message_manager_; // message auto parallel std::vector<grape::VertexArray<typename FRAG_T::vertices_t, VD_T>>& data_; std::vector<grape::SyncBuffer<typename FRAG_T::vertices_t, VD_T>> partial_result_; }; template <typename FRAG_T> class PIEAdjList { using fragment_t = FRAG_T; using nbr_t = typename fragment_t::nbr_t; using nbr_iterator_t = const nbr_t*; using adj_list_t = typename fragment_t::adj_list_t; public: PIEAdjList() : adj_list_() {} explicit PIEAdjList(adj_list_t adj_list) : adj_list_(adj_list) {} ~PIEAdjList() = default; class iterator { using pointer_type = nbr_t*; using reference_type = nbr_t&; public: iterator() noexcept : nbr_() {} explicit iterator(nbr_t&& nbr) : nbr_(std::move(nbr)) {} explicit iterator(const nbr_t& nbr) : nbr_(nbr) {} iterator& operator=(const nbr_t& nbr) { nbr_ = nbr; } iterator& operator=(nbr_t&& nbr) { nbr_ = std::move(nbr); } reference_type operator*() noexcept { return nbr_; } pointer_type operator->() noexcept { return &nbr_; } iterator& operator++() noexcept { ++nbr_; return *this; } const iterator operator++(int) noexcept { return iterator(std::move(nbr_++)); } bool operator==(const iterator& rhs) noexcept { return nbr_ == rhs.nbr_; } bool operator!=(const iterator& rhs) noexcept { return nbr_ != rhs.nbr_; } private: nbr_t nbr_; }; iterator begin() { return iterator(adj_list_.begin()); } iterator end() { return iterator(adj_list_.end()); } size_t size() { return adj_list_.size(); } private: adj_list_t adj_list_; }; } // namespace gs #endif // ANALYTICAL_ENGINE_APPS_PYTHON_PIE_WRAPPER_H_