in src/toolkits/graph_analytics/sssp_sgraph.cpp [298:544]
std::vector<flexible_type> _all_shortest_paths(std::shared_ptr<unity_sgraph> sourcegraph,
std::vector<flexible_type> sources,
std::vector<flexible_type> dests,
std::string weight_column){
// create fast lookup sets
std::set<flexible_type> source_set, dest_set;
for (auto i : sources) source_set.insert(i);
for (auto i : dests) dest_set.insert(i);
// lets get a minimal graph to work on
sgraph g(sourcegraph->get_graph());
g.select_vertex_fields({sgraph::VID_COLUMN_NAME});
if (weight_column.empty()) {
g.select_edge_fields({sgraph::SRC_COLUMN_NAME, sgraph::DST_COLUMN_NAME});
} else {
g.select_edge_fields({sgraph::SRC_COLUMN_NAME, sgraph::DST_COLUMN_NAME, weight_column});
}
// add a numeric vertex ID column
{
std::vector<flex_int> vids;
vids.reserve(g.num_vertices());
for (size_t i = 0;i < g.num_vertices(); ++i) vids.push_back(i);
auto numeric_ids = std::make_shared<sarray<flexible_type>>();
numeric_ids->open_for_write(1); numeric_ids->set_type(flex_type_enum::INTEGER);
std::copy(vids.begin(), vids.end(), numeric_ids->get_output_iterator(0));
numeric_ids->close();
g.add_vertex_field(numeric_ids, "__sssp_numeric_vertex_id__");
}
size_t id_idx = g.get_vertex_field_id(sgraph::VID_COLUMN_NAME);
size_t numeric_id_idx = g.get_vertex_field_id("__sssp_numeric_vertex_id__");
size_t weight_idx = (size_t)(-1);
if (!weight_column.empty()) {
weight_idx = g.get_edge_field_id(weight_column);
}
bool found_source_vertex = false;
bool found_dest_vertex = false;
/*
* SOURCE vertices are marked as distance 1
* DEST vertices are marked as distance -1
* ALL other vertices are marked as distance 0
*
* We expand from SOURCE increasing.
* We expand from DEST decreasing.
* When a positive number hits a negative number, we are done.
*/
struct vertex_data {
double distance = 0; // if < 0, -(distance+1) is distance to sink
// if > 0 distance-1 is distance to source
// if 0, is undiscovered
flexible_type id; // the ID of the vertex
flex_int parent = (-1); // if distance < 0, is the next vertex to sink
// if distance > 0 is the next vertex to source
// if (-1) is undiscovered
double parent_weight = 0; // edge weight to parent
};
std::vector<vertex_data> vertices(g.num_vertices());
// load all the vertex data into memory
{
parallel_for(0, g.get_num_partitions(),
[&](size_t segid) {
auto& vertex_frame = g.vertex_partition(segid);
auto reader = vertex_frame.get_reader(1);
for(auto iter = reader->begin(0);
iter != reader->end(0);
++iter) {
auto numeric_id = (*iter)[numeric_id_idx];
auto id = (*iter)[id_idx];
vertices[numeric_id].id = id;
if (source_set.count(id)) {
found_source_vertex = true;
vertices[numeric_id].distance = 1;
} else if (dest_set.count(id)) {
found_dest_vertex = true;
vertices[numeric_id].distance = -1;
}
}
});
if (!found_source_vertex) log_and_throw("Cannot find source vertices");
if (!found_dest_vertex) log_and_throw("Cannot find destination vertices");
}
g.remove_vertex_field(sgraph::VID_COLUMN_NAME);
g.rename_vertex_fields({"__sssp_numeric_vertex_id__"},{sgraph::VID_COLUMN_NAME});
id_idx = g.get_vertex_field_id(sgraph::VID_COLUMN_NAME);
// compute bidirectional sssp
std::atomic<int64_t> num_changed;
// this maps cost --> path. This is periodically pruned to always contain
// the best path
std::map<double, std::vector<flexible_type> > shortest_paths;
mutex shortest_paths_accumulated_lock;
// this contains [vid, vid] --> cost. This enumerates all the edges where
// source expansion meets the destination expansion.
std::map<std::pair<flex_int, flex_int>, double> paths_discovered;
mutex paths_discovered_lock;
sgraph_compute::sgraph_engine<flexible_type> ga;
sgraph_compute::triple_apply_fn_type apply_fn =
[&](sgraph_compute::edge_scope& scope) {
double weight = (weight_idx == (size_t)(-1)) ? 1.0 : (double)(scope.edge()[weight_idx]);
scope.lock_vertices();
flex_int source_id = (flex_int)scope.source()[id_idx];
flex_int dest_id = (flex_int)scope.target()[id_idx];
/*
* ASSERT_LT(source_id, vertices.size());
* ASSERT_LT(dest_id, vertices.size());
*/
vertex_data& source = vertices[source_id];
vertex_data& dest = vertices[dest_id];
if (source.distance > 0 && dest.distance >= 0) {
// source propagation
/*
* logprogress_stream
* << "Source Propagation: "
* << source.id << ":" << source.distance
* << " --> "
* << dest.id << ":" << dest.distance << std::endl;
*/
double new_cost = source.distance + weight;
if (dest.distance == 0 ||
dest.distance > new_cost) {
dest.distance = new_cost;
dest.parent = source_id;
dest.parent_weight = weight;
++num_changed;
}
}
else if (dest.distance < 0 && source.distance <= 0) {
// dest propagation. distances are negated and go downwards
// be careful of the signs of the compareisons
double new_cost = dest.distance - weight;
if (source.distance == 0 ||
source.distance < new_cost) {
source.distance = new_cost;
source.parent = dest_id;
source.parent_weight = weight;
++num_changed;
}
} else if (source.distance > 0 && dest.distance < 0) {
// On this edge, the source expansion meets the dest expansion.
// i.e. at this point a path has been discovered.
// compute the cost - 2 because source begins at cost 1, and dest begins begins at cost -1
double path_cost = source.distance + (-dest.distance) - 2 + weight;
auto edge_pair = std::make_pair(source_id, dest_id);
// have we found this exact path before?
bool path_found_before = false;
paths_discovered_lock.lock();
auto paths_discovered_iter = paths_discovered.find(edge_pair);
if (paths_discovered_iter != paths_discovered.end() &&
paths_discovered_iter->second <= path_cost) {
// we found the exact entry with an as good or better path listed.
path_found_before = true;
}
// only perform the rest of the path insertion if we have found a path before
if (!path_found_before) {
flex_list outpath;
vertex_data x = source;
outpath.push_back(x.id);
while(1) {
if (x.parent >= 0) {
x = vertices[x.parent];
outpath.push_back(x.id);
}
else break;
}
flex_list target_path;
x = dest;
target_path.push_back(x.id);
while(1) {
if (x.parent >= 0) {
x = vertices[x.parent];
target_path.push_back(x.id);
}
else break;
}
std::vector<flexible_type> path(outpath.rbegin(), outpath.rend());
path.insert(path.end(), target_path.begin(), target_path.end());
/*
* logprogress_stream << "Discovering path of cost " << path_cost
* << " meeting at "
* << source.id << " " << dest.id << std::endl;
*/
shortest_paths_accumulated_lock.lock();
// only insert if I am better or as good as the best path I have found
if (shortest_paths.size() == 0 || path_cost <= shortest_paths.begin()->first) {
shortest_paths[path_cost].push_back(path);
// trim down. any worse poaths
if (shortest_paths.size() > 1) {
auto iter = shortest_paths.begin(); ++iter;
shortest_paths.erase(iter, shortest_paths.end());
}
}
shortest_paths_accumulated_lock.unlock();
paths_discovered[edge_pair] = path_cost;
++num_changed;
}
paths_discovered_lock.unlock();
}
scope.unlock_vertices();
};
while(true) {
if(cppipc::must_cancel()) {
log_and_throw(std::string("Toolkit cancelled by user."));
}
num_changed = 0;
sgraph_compute::triple_apply(g, apply_fn, std::vector<std::string>(),
std::vector<std::string>());
logprogress_stream << "Num vertices updated: " << num_changed.load() << std::endl;
if (num_changed == 0) {
break;
}
// perform path acceleration
while(1) {
size_t acceleration_changes = 0;
for (size_t i = 0;i < vertices.size(); ++i) {
if (vertices[i].parent != -1) {
if (vertices[i].distance > 0 &&
vertices[i].distance > vertices[vertices[i].parent].distance + vertices[i].parent_weight) {
vertices[i].distance = vertices[vertices[i].parent].distance + vertices[i].parent_weight;
++acceleration_changes;
}
else if (vertices[i].distance < 0 &&
vertices[i].distance < vertices[vertices[i].parent].distance - vertices[i].parent_weight) {
vertices[i].distance = vertices[vertices[i].parent].distance - vertices[i].parent_weight;
++acceleration_changes;
}
}
}
logprogress_stream << "Num accelerated relaxations: " << acceleration_changes << std::endl;
if (acceleration_changes == 0) break;
}
}
if (shortest_paths.size() == 0) return std::vector<flexible_type>();
else return shortest_paths.begin()->second;
}