analytical_engine/core/parallel/property_message_manager.h (49 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_CORE_PARALLEL_PROPERTY_MESSAGE_MANAGER_H_ #define ANALYTICAL_ENGINE_CORE_PARALLEL_PROPERTY_MESSAGE_MANAGER_H_ #include "grape/graph/adj_list.h" #include "grape/parallel/default_message_manager.h" namespace gs { /** * @brief Property message manager. * * The send and recv methods are not thread-safe. */ class PropertyMessageManager : public grape::DefaultMessageManager { public: /** * @brief Communication via a crossing edge a<-c. It sends message * from a to c. * * @tparam GRAPH_T * @tparam MESSAGE_T * @param frag * @param v: a * @param msg */ template <typename GRAPH_T, typename MESSAGE_T> inline void SendMsgThroughIEdges(const GRAPH_T& frag, const typename GRAPH_T::vertex_t& v, const typename GRAPH_T::label_id_t e_label, const MESSAGE_T& msg) { grape::DestList dsts = frag.IEDests(v, e_label); auto* ptr = dsts.begin; typename GRAPH_T::vid_t gid = frag.GetInnerVertexGid(v); while (ptr != dsts.end) { auto fid = *(ptr++); to_send_[fid] << gid << msg; } } /** * @brief Communication via a crossing edge a->b. It sends message * from a to b. * * @tparam GRAPH_T * @tparam MESSAGE_T * @param frag * @param v: a * @param msg */ template <typename GRAPH_T, typename MESSAGE_T> inline void SendMsgThroughOEdges(const GRAPH_T& frag, const typename GRAPH_T::vertex_t& v, const typename GRAPH_T::label_id_t e_label, const MESSAGE_T& msg) { auto dsts = frag.OEDests(v, e_label); auto* ptr = dsts.begin; typename GRAPH_T::vid_t gid = frag.GetInnerVertexGid(v); while (ptr != dsts.end) { auto fid = *(ptr++); to_send_[fid] << gid << msg; } } /** * @brief Communication via crossing edges a->b and a<-c. It sends message * from a to b and c. * * @tparam GRAPH_T * @tparam MESSAGE_T * @param frag * @param v: a * @param msg */ template <typename GRAPH_T, typename MESSAGE_T> inline void SendMsgThroughEdges(const GRAPH_T& frag, const typename GRAPH_T::vertex_t& v, const typename GRAPH_T::label_id_t e_label, const MESSAGE_T& msg) { auto dsts = frag.IOEDests(v, e_label); auto* ptr = dsts.begin; typename GRAPH_T::vid_t gid = frag.GetInnerVertexGid(v); while (ptr != dsts.end) { auto fid = *(ptr++); to_send_[fid] << gid << msg; } } }; } // namespace gs #endif // ANALYTICAL_ENGINE_CORE_PARALLEL_PROPERTY_MESSAGE_MANAGER_H_