analytical_engine/apps/pregel/sssp_pregel.h (65 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_ #include <algorithm> #include <limits> #include <string> #include "vineyard/graph/fragment/arrow_fragment.h" #include "core/app/pregel/i_vertex_program.h" #include "core/app/pregel/pregel_compute_context.h" #include "core/app/pregel/pregel_property_app_base.h" namespace gs { class PregelSSSP : public IPregelProgram< PregelPropertyVertex< vineyard::ArrowFragment<vineyard::property_graph_types::OID_TYPE, vineyard::property_graph_types::VID_TYPE>, double, double>, PregelPropertyComputeContext< vineyard::ArrowFragment<vineyard::property_graph_types::OID_TYPE, vineyard::property_graph_types::VID_TYPE>, double, double>> { using fragment_t = vineyard::ArrowFragment<vineyard::property_graph_types::OID_TYPE, vineyard::property_graph_types::VID_TYPE>; public: void Init(PregelPropertyVertex<fragment_t, double, double>& v, PregelPropertyComputeContext<fragment_t, double, double>& context) override { v.set_value(std::numeric_limits<double>::max()); } void Compute(grape::IteratorPair<double*> messages, PregelPropertyVertex<fragment_t, double, double>& v, PregelPropertyComputeContext<fragment_t, double, double>& context) override { bool updated = false; if (context.superstep() == 0) { std::string source_id = context.get_config("src"); if (v.id() == source_id) { updated = true; v.set_value(0); } } else { double cur_value = v.value(); double new_value = cur_value; for (auto msg : messages) { new_value = std::min(new_value, msg); } if (new_value != cur_value) { v.set_value(new_value); updated = true; } } if (updated) { double dist = v.value(); for (int label_id = 0; label_id < context.edge_label_num(); label_id++) { for (auto& e : v.outgoing_edges(label_id)) { double new_dist = dist + static_cast<double>(e.get_int(0)); v.send(e.vertex(), new_dist); } } } v.vote_to_halt(); } }; } // namespace gs #endif // ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_