analytical_engine/apps/property/sssp_property_append.h (204 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ANALYTICAL_ENGINE_APPS_PROPERTY_SSSP_PROPERTY_APPEND_H_
#define ANALYTICAL_ENGINE_APPS_PROPERTY_SSSP_PROPERTY_APPEND_H_
#include <iomanip>
#include <limits>
#include <vector>
#include "grape/grape.h"
#include "core/app/property_app_base.h"
#include "core/context/vertex_data_context.h"
namespace gs {
/**
* An SSSP implementation for labeled appendable graph
* @tparam FRAG_T
*/
template <typename FRAG_T>
class SSSPPropertyAppendContext
: public LabeledVertexDataContext<FRAG_T, double> {
using vid_t = typename FRAG_T::vid_t;
using oid_t = typename FRAG_T::oid_t;
public:
explicit SSSPPropertyAppendContext(const FRAG_T& fragment)
: LabeledVertexDataContext<FRAG_T, double>(fragment, true),
comp_id(this->data()) {}
void Init(grape::DefaultMessageManager& messages, oid_t source_id_) {
auto& frag = this->fragment();
auto v_label_num = frag.vertex_label_num();
source_id = source_id_;
curr_modified.resize(v_label_num);
next_modified.resize(v_label_num);
for (auto v_label = 0; v_label != v_label_num; ++v_label) {
auto vertices = frag.Vertices(v_label);
comp_id[v_label].SetValue(std::numeric_limits<double>::max());
curr_modified[v_label].Init(vertices, false);
next_modified[v_label].Init(vertices, false);
}
}
void Output(std::ostream& os) override {
auto& frag = this->fragment();
for (auto i = 0; i < frag.vertex_label_num(); ++i) {
auto iv = frag.InnerVertices(i);
for (auto v : iv) {
double d = comp_id[i][v];
if (d == std::numeric_limits<double>::max()) {
os << frag.GetId(v) << " infinity" << std::endl;
} else {
os << frag.GetId(v) << " " << std::scientific << std::setprecision(15)
<< d << std::endl;
}
}
}
}
oid_t source_id;
std::vector<typename FRAG_T::template vertex_array_t<double>>& comp_id;
std::vector<typename FRAG_T::template vertex_array_t<bool>> curr_modified;
std::vector<typename FRAG_T::template vertex_array_t<bool>> next_modified;
};
template <typename FRAG_T>
class SSSPPropertyAppend
: public PropertyAppBase<FRAG_T, SSSPPropertyAppendContext<FRAG_T>> {
public:
INSTALL_DEFAULT_PROPERTY_WORKER(SSSPPropertyAppend<FRAG_T>,
SSSPPropertyAppendContext<FRAG_T>, FRAG_T)
using vid_t = typename FRAG_T::vid_t;
using vertex_t = typename FRAG_T::vertex_t;
using label_id_t = typename FRAG_T::label_id_t;
void PEval(const fragment_t& frag, context_t& ctx,
message_manager_t& messages) {
vertex_t source;
bool native_source = false;
label_id_t v_label_num = frag.vertex_label_num();
label_id_t e_label_num = frag.edge_label_num();
for (label_id_t i = 0; i < v_label_num; ++i) {
native_source = frag.GetInnerVertex(i, ctx.source_id, source);
if (native_source) {
break;
}
}
if (native_source) {
label_id_t label = frag.vertex_label(source);
ctx.comp_id[label][source] = 0;
} else {
return;
}
for (label_id_t j = 0; j < e_label_num; ++j) {
auto es = frag.GetOutgoingAdjList(source, j);
for (auto& e : es) {
auto u = e.neighbor();
auto u_dist = static_cast<double>(e.template get_data<int64_t>(0));
label_id_t u_label = frag.vertex_label(u);
if (ctx.comp_id[u_label][u] > u_dist) {
ctx.comp_id[u_label][u] = u_dist;
ctx.next_modified[u_label][u] = true;
}
}
auto extra_es = frag.GetExtraOutgoingAdjList(source, j);
for (auto& e : extra_es) {
auto u = e.neighbor();
auto u_dist = static_cast<double>(e.template get_data<int64_t>(0));
label_id_t u_label = frag.vertex_label(u);
if (ctx.comp_id[u_label][u] > u_dist) {
ctx.comp_id[u_label][u] = u_dist;
ctx.next_modified[u_label][u] = true;
}
}
}
for (label_id_t i = 0; i < v_label_num; ++i) {
auto ov = frag.OuterVertices(i);
for (auto v : ov) {
if (ctx.next_modified[i][v]) {
messages.SyncStateOnOuterVertex<fragment_t, double>(
frag, v, ctx.comp_id[i][v]);
ctx.next_modified[i][v] = false;
}
}
}
for (label_id_t i = 0; i < v_label_num; ++i) {
auto iv = frag.InnerVertices(i);
bool ok = false;
for (auto v : iv) {
if (ctx.next_modified[i][v]) {
messages.ForceContinue();
ok = true;
break;
}
}
if (ok) {
break;
}
}
ctx.curr_modified.swap(ctx.next_modified);
}
void IncEval(const fragment_t& frag, context_t& ctx,
message_manager_t& messages) {
{
vertex_t v(0);
double val;
while (messages.GetMessage<fragment_t, double>(frag, v, val)) {
label_id_t v_label = frag.vertex_label(v);
if (ctx.comp_id[v_label][v] > val) {
ctx.comp_id[v_label][v] = val;
ctx.curr_modified[v_label][v] = true;
}
}
}
label_id_t v_label_num = frag.vertex_label_num();
label_id_t e_label_num = frag.edge_label_num();
for (label_id_t i = 0; i < v_label_num; ++i) {
auto iv = frag.InnerVertices(i);
for (auto v : iv) {
if (!ctx.curr_modified[i][v]) {
continue;
}
ctx.curr_modified[i][v] = false;
auto v_dist = ctx.comp_id[i][v];
for (label_id_t j = 0; j < e_label_num; ++j) {
auto es = frag.GetOutgoingAdjList(v, j);
for (auto& e : es) {
auto u = e.neighbor();
auto u_dist =
v_dist + static_cast<double>(e.template get_data<int64_t>(0));
label_id_t u_label = frag.vertex_label(u);
if (ctx.comp_id[u_label][u] > u_dist) {
ctx.comp_id[u_label][u] = u_dist;
ctx.next_modified[u_label][u] = true;
}
}
auto extra_es = frag.GetExtraOutgoingAdjList(v, j);
for (auto& e : extra_es) {
auto u = e.neighbor();
auto u_dist =
v_dist + static_cast<double>(e.template get_data<int64_t>(0));
label_id_t u_label = frag.vertex_label(u);
if (ctx.comp_id[u_label][u] > u_dist) {
ctx.comp_id[u_label][u] = u_dist;
ctx.next_modified[u_label][u] = true;
}
}
}
}
}
for (label_id_t i = 0; i < v_label_num; ++i) {
auto ov = frag.OuterVertices(i);
for (auto v : ov) {
if (ctx.next_modified[i][v]) {
messages.SyncStateOnOuterVertex<fragment_t, double>(
frag, v, ctx.comp_id[i][v]);
ctx.next_modified[i][v] = false;
}
}
}
for (label_id_t i = 0; i < v_label_num; ++i) {
auto iv = frag.InnerVertices(i);
bool ok = false;
for (auto v : iv) {
if (ctx.next_modified[i][v]) {
messages.ForceContinue();
ok = true;
break;
}
}
if (ok) {
break;
}
}
ctx.curr_modified.swap(ctx.next_modified);
}
};
} // namespace gs
#endif // ANALYTICAL_ENGINE_APPS_PROPERTY_SSSP_PROPERTY_APPEND_H_