analytical_engine/benchmarks/apps/pagerank/property_pagerank.h (151 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_BENCHMARKS_APPS_PAGERANK_PROPERTY_PAGERANK_H_ #define ANALYTICAL_ENGINE_BENCHMARKS_APPS_PAGERANK_PROPERTY_PAGERANK_H_ #include <iomanip> #include <limits> #include "grape/grape.h" #include "core/app/parallel_property_app_base.h" #include "core/context/vertex_data_context.h" #include "core/worker/parallel_property_worker.h" namespace gs { namespace benchmarks { template <typename FRAG_T> class PropertyPageRankContext : public LabeledVertexDataContext<FRAG_T, double> { public: using oid_t = typename FRAG_T::oid_t; using vid_t = typename FRAG_T::vid_t; explicit PropertyPageRankContext(const FRAG_T& fragment) : LabeledVertexDataContext<FRAG_T, double>(fragment, true), result(this->data()[0]) {} void Init(ParallelPropertyMessageManager& messages, double delta, int max_round) { auto& frag = this->fragment(); auto vertices = frag.Vertices(0); auto inner_vertices = frag.InnerVertices(0); this->delta = delta; this->max_round = max_round; degree.Init(inner_vertices, 0); result.Init(vertices, 0.0); next_result.Init(vertices); step = 0; } void Output(std::ostream& os) { auto& frag = this->fragment(); auto inner_vertices = frag.InnerVertices(0); for (auto v : inner_vertices) { if (degree[v] == 0) { os << frag.GetId(v) << " " << std::scientific << std::setprecision(15) << result[v] << std::endl; } else { os << frag.GetId(v) << " " << std::scientific << std::setprecision(15) << result[v] * degree[v] << std::endl; } } /* for (auto v : inner_vertices) { os << frag.GetId(v) << " " << result[v] << std::endl; } */ } typename FRAG_T::template vertex_array_t<int> degree; typename FRAG_T::template vertex_array_t<double>& result; typename FRAG_T::template vertex_array_t<double> next_result; vid_t dangling_vnum = 0; int step = 0; int max_round = 0; double delta = 0; double dangling_sum = 0.0; }; template <typename FRAG_T> class PropertyPageRank : public ParallelPropertyAppBase<FRAG_T, PropertyPageRankContext<FRAG_T>>, public grape::ParallelEngine, public grape::Communicator { public: static constexpr grape::MessageStrategy message_strategy = grape::MessageStrategy::kAlongOutgoingEdgeToOuterVertex; static constexpr grape::LoadStrategy load_strategy = grape::LoadStrategy::kBothOutIn; INSTALL_PARALLEL_PROPERTY_WORKER(PropertyPageRank<FRAG_T>, PropertyPageRankContext<FRAG_T>, FRAG_T) using vertex_t = typename fragment_t::vertex_t; void PEval(const fragment_t& frag, context_t& ctx, message_manager_t& messages) { auto inner_vertices = frag.InnerVertices(0); size_t graph_vnum = frag.GetTotalVerticesNum(0); messages.InitChannels(thread_num()); ctx.step = 0; double p = 1.0 / graph_vnum; // assign initial ranks ForEach(inner_vertices, [&ctx, &frag, p, &messages](int tid, vertex_t u) { int EdgeNum = frag.GetOutgoingAdjList(u, 0).Size(); ctx.degree[u] = EdgeNum; if (EdgeNum > 0) { ctx.result[u] = p / EdgeNum; messages.SendMsgThroughOEdges<fragment_t, double>(frag, u, 0, ctx.result[u], tid); } else { ctx.result[u] = p; } }); for (auto u : inner_vertices) { if (ctx.degree[u] == 0) { ++ctx.dangling_vnum; } } double dangling_sum = p * static_cast<double>(ctx.dangling_vnum); Sum(dangling_sum, ctx.dangling_sum); messages.ForceContinue(); } void IncEval(const fragment_t& frag, context_t& ctx, message_manager_t& messages) { auto inner_vertices = frag.InnerVertices(0); double dangling_sum = ctx.dangling_sum; size_t graph_vnum = frag.GetTotalVerticesNum(0); ++ctx.step; if (ctx.step > ctx.max_round) { return; } double base = (1.0 - ctx.delta) / graph_vnum + ctx.delta * dangling_sum / graph_vnum; // process received ranks sent by other workers { messages.ParallelProcess<fragment_t, double>( thread_num(), frag, [&ctx](int tid, vertex_t u, const double& msg) { ctx.result[u] = msg; }); } // compute new ranks and send messages if (ctx.step != ctx.max_round) { ForEach(inner_vertices, [&ctx, base, &frag, &messages](int tid, vertex_t u) { if (ctx.degree[u] == 0) { ctx.next_result[u] = base; } else { double cur = 0; auto es = frag.GetIncomingAdjList(u, 0); for (auto& e : es) { cur += ctx.result[e.get_neighbor()]; } cur = (ctx.delta * cur + base) / ctx.degree[u]; ctx.next_result[u] = cur; messages.SendMsgThroughOEdges<fragment_t, double>( frag, u, 0, ctx.next_result[u], tid); } }); } else { ForEach(inner_vertices, [&ctx, base, &frag](int tid, vertex_t u) { if (ctx.degree[u] == 0) { ctx.next_result[u] = base; } else { double cur = 0; auto es = frag.GetIncomingAdjList(u, 0); for (auto& e : es) { cur += ctx.result[e.get_neighbor()]; } cur = (ctx.delta * cur + base) / ctx.degree[u]; ctx.next_result[u] = cur; } }); } ctx.result.Swap(ctx.next_result); double new_dangling = base * static_cast<double>(ctx.dangling_vnum); Sum(new_dangling, ctx.dangling_sum); messages.ForceContinue(); } }; } // namespace benchmarks } // namespace gs #endif // ANALYTICAL_ENGINE_BENCHMARKS_APPS_PAGERANK_PROPERTY_PAGERANK_H_