std::vector _all_shortest_paths()

in src/toolkits/graph_analytics/sssp_sgraph.cpp [298:544]


std::vector<flexible_type> _all_shortest_paths(std::shared_ptr<unity_sgraph> sourcegraph,
                                               std::vector<flexible_type> sources,
                                               std::vector<flexible_type> dests,
                                               std::string weight_column){
  // create fast lookup sets
  std::set<flexible_type> source_set, dest_set;
  for (auto i : sources) source_set.insert(i);
  for (auto i : dests) dest_set.insert(i);
  // lets get a minimal graph to work on
  sgraph g(sourcegraph->get_graph());

  g.select_vertex_fields({sgraph::VID_COLUMN_NAME});
  if (weight_column.empty()) {
    g.select_edge_fields({sgraph::SRC_COLUMN_NAME, sgraph::DST_COLUMN_NAME});
  } else {
    g.select_edge_fields({sgraph::SRC_COLUMN_NAME, sgraph::DST_COLUMN_NAME, weight_column});
  }
  // add a numeric vertex ID column
  {
    std::vector<flex_int> vids;
    vids.reserve(g.num_vertices());
    for (size_t i = 0;i < g.num_vertices(); ++i) vids.push_back(i);
    auto numeric_ids = std::make_shared<sarray<flexible_type>>();
    numeric_ids->open_for_write(1); numeric_ids->set_type(flex_type_enum::INTEGER);
    std::copy(vids.begin(), vids.end(), numeric_ids->get_output_iterator(0));
    numeric_ids->close();
    g.add_vertex_field(numeric_ids, "__sssp_numeric_vertex_id__");
  }

  size_t id_idx = g.get_vertex_field_id(sgraph::VID_COLUMN_NAME);
  size_t numeric_id_idx = g.get_vertex_field_id("__sssp_numeric_vertex_id__");
  size_t weight_idx = (size_t)(-1);

  if (!weight_column.empty()) {
    weight_idx = g.get_edge_field_id(weight_column);
  }
  bool found_source_vertex = false;
  bool found_dest_vertex = false;
  /*
   * SOURCE vertices are marked as distance 1
   * DEST vertices are marked as distance -1
   * ALL other vertices are marked as distance 0
   *
   * We expand from SOURCE increasing.
   * We expand from DEST decreasing.
   * When a positive number hits a negative number, we are done.
   */
  struct vertex_data {
    double distance = 0; // if < 0, -(distance+1) is distance to sink
                         // if > 0 distance-1 is distance to source
                         // if 0, is undiscovered
    flexible_type id;    // the ID of the vertex
    flex_int parent = (-1); // if distance < 0, is the next vertex to sink
                            // if distance > 0 is the next vertex to source
                            // if (-1) is undiscovered
    double parent_weight = 0; // edge weight to parent
  };


  std::vector<vertex_data> vertices(g.num_vertices());

  // load all the vertex data into memory
  {
    parallel_for(0, g.get_num_partitions(),
                 [&](size_t segid) {
                  auto& vertex_frame = g.vertex_partition(segid);
                  auto reader = vertex_frame.get_reader(1);
                   for(auto iter = reader->begin(0);
                       iter != reader->end(0);
                       ++iter) {
                     auto numeric_id = (*iter)[numeric_id_idx];
                     auto id = (*iter)[id_idx];
                     vertices[numeric_id].id = id;
                     if (source_set.count(id)) {
                       found_source_vertex = true;
                       vertices[numeric_id].distance = 1;
                     } else if (dest_set.count(id)) {
                       found_dest_vertex = true;
                       vertices[numeric_id].distance = -1;
                     }
                   }
                 });
    if (!found_source_vertex) log_and_throw("Cannot find source vertices");
    if (!found_dest_vertex) log_and_throw("Cannot find destination vertices");
  }
  g.remove_vertex_field(sgraph::VID_COLUMN_NAME);
  g.rename_vertex_fields({"__sssp_numeric_vertex_id__"},{sgraph::VID_COLUMN_NAME});
  id_idx = g.get_vertex_field_id(sgraph::VID_COLUMN_NAME);

  // compute bidirectional sssp

  std::atomic<int64_t> num_changed;
  // this maps cost --> path. This is periodically pruned to always contain
  // the best path
  std::map<double, std::vector<flexible_type> > shortest_paths;
  mutex shortest_paths_accumulated_lock;
  // this contains [vid, vid] --> cost. This enumerates all the edges where
  // source expansion meets the destination expansion.
  std::map<std::pair<flex_int, flex_int>, double> paths_discovered;
  mutex paths_discovered_lock;


  sgraph_compute::sgraph_engine<flexible_type> ga;

  sgraph_compute::triple_apply_fn_type apply_fn =
      [&](sgraph_compute::edge_scope& scope) {
        double weight = (weight_idx == (size_t)(-1)) ? 1.0 : (double)(scope.edge()[weight_idx]);
        scope.lock_vertices();
        flex_int source_id = (flex_int)scope.source()[id_idx];
        flex_int dest_id = (flex_int)scope.target()[id_idx];
        /*
         * ASSERT_LT(source_id, vertices.size());
         * ASSERT_LT(dest_id, vertices.size());
         */
        vertex_data& source = vertices[source_id];
        vertex_data& dest = vertices[dest_id];

        if (source.distance > 0 && dest.distance >= 0) {
          // source propagation
          /*
           * logprogress_stream
           *     << "Source Propagation: "
           *     << source.id << ":" << source.distance
           *     << " --> "
           *     << dest.id << ":" << dest.distance << std::endl;
           */
          double new_cost = source.distance + weight;
          if (dest.distance == 0 ||
              dest.distance > new_cost) {
            dest.distance = new_cost;
            dest.parent = source_id;
            dest.parent_weight = weight;
            ++num_changed;
          }
        }
        else if (dest.distance < 0 && source.distance <= 0) {
          // dest propagation. distances are negated and go downwards
          // be careful of the signs of the compareisons
          double new_cost = dest.distance - weight;
          if (source.distance == 0 ||
              source.distance < new_cost) {
            source.distance = new_cost;
            source.parent = dest_id;
            source.parent_weight = weight;
            ++num_changed;
          }
        } else if (source.distance > 0 && dest.distance < 0) {
          // On this edge, the source expansion meets the dest expansion.
          // i.e. at this point a path has been discovered.
          // compute the cost - 2 because source begins at cost 1, and dest begins begins at cost -1
          double path_cost = source.distance + (-dest.distance) - 2 + weight;
          auto edge_pair = std::make_pair(source_id, dest_id);

          // have we found this exact path before?
          bool path_found_before = false;
          paths_discovered_lock.lock();
          auto paths_discovered_iter = paths_discovered.find(edge_pair);
          if (paths_discovered_iter != paths_discovered.end() &&
              paths_discovered_iter->second <= path_cost) {
            // we found the exact entry with an as good or better path listed.
            path_found_before = true;
          }

          // only perform the rest of the path insertion if we have found a path before
          if (!path_found_before) {
            flex_list outpath;
            vertex_data x = source;
            outpath.push_back(x.id);
            while(1) {
              if (x.parent >= 0) {
                x = vertices[x.parent];
                outpath.push_back(x.id);
              }
              else break;
            }

            flex_list target_path;
            x = dest;
            target_path.push_back(x.id);
            while(1) {
              if (x.parent >= 0) {
                x = vertices[x.parent];
                target_path.push_back(x.id);
              }
              else break;
            }

            std::vector<flexible_type> path(outpath.rbegin(), outpath.rend());
            path.insert(path.end(), target_path.begin(), target_path.end());
            /*
             * logprogress_stream << "Discovering path of cost " << path_cost
             *                    << " meeting at "
             *                    << source.id << " " << dest.id << std::endl;
             */
            shortest_paths_accumulated_lock.lock();
            // only insert if I am better or as good as the best path I have found
            if (shortest_paths.size() == 0 || path_cost <= shortest_paths.begin()->first) {
              shortest_paths[path_cost].push_back(path);
              // trim down. any worse poaths
              if (shortest_paths.size() > 1) {
                auto iter = shortest_paths.begin(); ++iter;
                shortest_paths.erase(iter, shortest_paths.end());
              }
            }
            shortest_paths_accumulated_lock.unlock();
            paths_discovered[edge_pair] = path_cost;
            ++num_changed;
          }
          paths_discovered_lock.unlock();
        }
        scope.unlock_vertices();
      };
  while(true) {
    if(cppipc::must_cancel()) {
      log_and_throw(std::string("Toolkit cancelled by user."));
    }
    num_changed = 0;
    sgraph_compute::triple_apply(g, apply_fn, std::vector<std::string>(),
                                 std::vector<std::string>());
    logprogress_stream << "Num vertices updated: "  << num_changed.load() << std::endl;
    if (num_changed == 0) {
      break;
    }
    // perform path acceleration
    while(1) {
      size_t acceleration_changes = 0;
      for (size_t i = 0;i < vertices.size(); ++i) {
        if (vertices[i].parent != -1) {
          if (vertices[i].distance > 0 &&
              vertices[i].distance > vertices[vertices[i].parent].distance + vertices[i].parent_weight) {
            vertices[i].distance = vertices[vertices[i].parent].distance + vertices[i].parent_weight;
            ++acceleration_changes;
          }
          else if (vertices[i].distance < 0 &&
                   vertices[i].distance < vertices[vertices[i].parent].distance - vertices[i].parent_weight) {
            vertices[i].distance = vertices[vertices[i].parent].distance - vertices[i].parent_weight;
            ++acceleration_changes;
          }
        }
      }
      logprogress_stream << "Num accelerated relaxations: " << acceleration_changes << std::endl;
      if (acceleration_changes == 0) break;
    }
  }
  if (shortest_paths.size() == 0) return std::vector<flexible_type>();
  else return shortest_paths.begin()->second;
}