sframe sgraph::get_edges()

in src/core/storage/sgraph_data/sgraph.cpp [232:431]


sframe sgraph::get_edges(const std::vector<flexible_type>& source_vids,
                         const std::vector<flexible_type>& target_vids,
                         const options_map_t& field_constraint,
                         size_t groupa, size_t groupb) const {
  sframe ret;
  if (num_edges(groupa, groupb) == 0) {
    ret.open_for_write(get_edge_fields(), get_edge_field_types());
    ret.close();
    return ret;
  }

  std::vector<std::string> efields = get_edge_fields(groupa, groupb);
  const std::vector<sframe>& egroup = edge_group(groupa, groupb);

  // Configure the field constraints
  std::vector< std::pair<size_t, flexible_type> > value_constraint;
  for (const auto& kv : field_constraint) {
    value_constraint.push_back({egroup[0].column_index(kv.first), kv.second});
  }

  // lambda for checking value constraint
  std::function<bool(const std::vector<flexible_type>&)> satisfy_value_constraint =
      [&](const std::vector<flexible_type>& edge_data) {
        for(const auto& kv : value_constraint) {
          const flexible_type& actual = edge_data[kv.first];
          const flexible_type& expected = kv.second;
          if (!(
                (expected.get_type() == flex_type_enum::UNDEFINED)
                || (actual == expected))) {
            return false;
          }
        }
        return true;
      };

  // column indices of the source vid and target vid
  size_t src_column_idx = egroup[0].column_index(SRC_COLUMN_NAME);
  size_t dst_column_idx = egroup[0].column_index(DST_COLUMN_NAME);

  // lambda for transform edge ids
  std::function<std::vector<flexible_type>(const std::vector<flexible_type>&,
                                           const std::vector<flexible_type>& source_vids,
                                           const std::vector<flexible_type>& target_vids)>
      edge_id_transform = [&](const std::vector<flexible_type>& row,
                              const std::vector<flexible_type>& source_vids,
                              const std::vector<flexible_type>& target_vids) {
        std::vector<flexible_type> ret = row;
        size_t src_idx = row[src_column_idx];
        size_t dst_idx = row[dst_column_idx];
        DASSERT_LT(src_idx, source_vids.size());
        DASSERT_LT(dst_idx, target_vids.size());
        ret[src_column_idx] = source_vids[src_idx];
        ret[dst_column_idx] = target_vids[dst_idx];
        return ret;
      };

  // Cache of {partition, group} -> List[vertex_ids]
  // Preamble functions will load the requied ids into this cache.
  std::unordered_map<std::pair<size_t, size_t>, std::vector<flexible_type>>
    partition_vid_cache;

  std::function<void(std::vector<std::pair<size_t, size_t>>)> load_partition_vids =
    [&](std::vector<std::pair<size_t, size_t>> coordinates) {
      std::set<std::pair<size_t, size_t>> pairs_to_load;
      std::set<std::pair<size_t, size_t>> pairs_to_unload;
      for (auto& c: coordinates) {
        pairs_to_load.insert({c.first, groupa});
        pairs_to_load.insert({c.second, groupb});
      }
      for (auto& kv: partition_vid_cache) {
        if (pairs_to_load.count(kv.first)) {
          pairs_to_load.erase(pairs_to_load.find(kv.first));
        } else {
          pairs_to_unload.insert(kv.first);
        }
      }
      for (auto& pair: pairs_to_unload) {
        partition_vid_cache.erase(partition_vid_cache.find(pair));
      }
      for (auto& pair: pairs_to_load) {
        partition_vid_cache[pair] = {};
      }
      std::vector<std::pair<size_t, size_t>> pairs_to_load_vec(
          pairs_to_load.begin(), pairs_to_load.end());
      parallel_for(0, pairs_to_load_vec.size(), [&](size_t i) {
        auto coord = pairs_to_load_vec[i];
        partition_vid_cache[coord] = get_vertex_ids(coord.first, coord.second);
      });
    };

  // Configure the id constraints
  bool match_all_vertices = true;
  for (size_t i = 0; i < source_vids.size(); ++i) {
    if (!((source_vids[i].get_type() == flex_type_enum::UNDEFINED)
        && (target_vids[i].get_type() == flex_type_enum::UNDEFINED))) {
      match_all_vertices = false;
      break;
    }
  }

  std::vector<sframe> out_edge_blocks(m_num_partitions * m_num_partitions);
  // Case 1: there is no source or target id constraints.
  if (match_all_vertices) {
    sgraph_compute::hilbert_blocked_parallel_for(
        get_num_partitions(),
        load_partition_vids,
        [&](std::pair<size_t, size_t> coordinate) {
          size_t i = coordinate.first;
          size_t j = coordinate.second;
          const std::vector<flexible_type>& src_partition_vids = partition_vid_cache.at({i, groupa});
          const std::vector<flexible_type>& dst_partition_vids = partition_vid_cache.at({j, groupb});

          sframe edge_sframe = edge_partition(i, j, groupa, groupb);
          std::vector<flex_type_enum> out_column_types = edge_sframe.column_types();
          out_column_types[src_column_idx] = m_vid_type;
          out_column_types[dst_column_idx] = m_vid_type;

          sframe out_sframe;
          out_sframe.open_for_write(edge_sframe.column_names(), out_column_types,
                                    "", edge_sframe.num_segments());
          copy_transform_if(edge_sframe, out_sframe, satisfy_value_constraint,
                            boost::bind(edge_id_transform, _1,
                                        boost::cref(src_partition_vids),
                                        boost::cref(dst_partition_vids)));
          out_sframe.close();
          out_edge_blocks[i * m_num_partitions + j] = std::move(out_sframe);
        });
  // Case 2: reorganize the id constraints into partitions.
  } else {
    // separate the source/target vertices that are matching wildcards.
    std::vector<std::unordered_set<flexible_type>> wild_source_vids(m_num_partitions);
    std::vector<std::unordered_set<flexible_type>> wild_target_vids(m_num_partitions);
    // normal source target id constraints:  map from (src_partition, dst_partition) -> set<(src_id, dst_id>
    std::unordered_map<std::pair<size_t, size_t>, std::unordered_set<std::pair<flexible_type, flexible_type>>>
      vid_constraints;
    for (size_t i = 0; i < m_num_partitions; ++i) {
      for (size_t j = 0; j < m_num_partitions; ++j) {
        vid_constraints[{i,j}] = {};
      }
    }

    for (size_t i = 0; i < source_vids.size(); ++i) {
      const flexible_type& source = source_vids[i];
      const flexible_type& target = target_vids[i];
      size_t source_pid = source.hash() % m_num_partitions;
      size_t target_pid = target.hash() % m_num_partitions;
      if (source.get_type() == flex_type_enum::UNDEFINED) {
        wild_target_vids[target_pid].insert(target);
      } else if (target.get_type() == flex_type_enum::UNDEFINED) {
        wild_source_vids[source_pid].insert(source);
      } else {
        vid_constraints[{source_pid, target_pid}].insert({source, target});
      }
    }
    sgraph_compute::hilbert_blocked_parallel_for(
        get_num_partitions(),
        load_partition_vids,
        [&](std::pair<size_t, size_t> coordinate) {
        size_t i = coordinate.first;
        size_t j = coordinate.second;
        const std::vector<flexible_type>& src_partition_vids = partition_vid_cache.at({i, groupa});
        const std::vector<flexible_type>& dst_partition_vids = partition_vid_cache.at({j, groupb});
        sframe edge_sframe = edge_partition(i, j, groupa, groupb);

        std::vector<flex_type_enum> out_column_types = edge_sframe.column_types();
        out_column_types[src_column_idx] = m_vid_type;
        out_column_types[dst_column_idx] = m_vid_type;

        sframe out_sframe;
        out_sframe.open_for_write(edge_sframe.column_names(), out_column_types,
                                  "", edge_sframe.num_segments());

        // The filter function checks the id constraints and then value constraints
        std::function<bool(const std::vector<flexible_type>&)> filter_fn =
            [&](const std::vector<flexible_type>& row) {
              size_t src_idx = row[src_column_idx];
              size_t dst_idx = row[dst_column_idx];
              const flexible_type& source = src_partition_vids[src_idx];
              const flexible_type& target = dst_partition_vids[dst_idx];
              std::pair<size_t, size_t> partition_addr{i, j};
              std::pair<flexible_type, flexible_type> source_target_pair{source, target};
              if ((wild_source_vids[i].count(source) || wild_target_vids[j].count(target)) ||
                  (vid_constraints.at(partition_addr).count(source_target_pair) > 0)) {
               return satisfy_value_constraint(row);
              }
              return false;
            };

        copy_transform_if(edge_sframe, out_sframe, filter_fn,
                          boost::bind(edge_id_transform, _1,
                                      boost::cref(src_partition_vids),
                                      boost::cref(dst_partition_vids)));
        out_sframe.close();
        out_edge_blocks[i * m_num_partitions + j] = std::move(out_sframe);
    });
  }
  for (auto& sf : out_edge_blocks)
    ret = ret.append(sf);
  return ret;
}