void sgraph::commit_edge_buffer()

in src/core/storage/sgraph_data/sgraph.cpp [599:897]


void sgraph::commit_edge_buffer(size_t groupa,
                                size_t groupb,
                                sframe edges) {

  timer local_timer, global_timer;
  std::atomic<size_t> vertices_added(0);
  std::atomic<size_t> edges_added(0);

  global_timer.start();

  logstream(LOG_EMPH) << "In commit edge buffer (" << groupa << "," << groupb << ")"
                       << std::endl;

  // This function is monsterous. Bascially, 3 big steps are involved.
  // 1. Keeping track of the new vertex id that got introduced by the incoming edges.
  // 2. Add an empty vertex for each new vertex id to its vertex partition.
  // 3. Translate the source and target ids in each edge partition to
  // be the row id of the corresponding vertex in the vertex partition, and append
  // to the existing edge partition.

  /**************************************************************************/
  /*                                                                        */
  /*                                 Step 1                                 */
  /*                                                                        */
  /**************************************************************************/
  //// First pass of all edge partitions, we aggregate all unique vertices
  //// seen in each vertex partition.

  local_timer.start();
  // Prepare vertex dedupliication buffer for each partition
  typedef sarray_sorted_buffer<flexible_type> vid_buffer_type;
  std::vector<std::shared_ptr<vid_buffer_type>> vid_buffer;
  size_t vertex_partition_size = (groupa == groupb) ? m_num_partitions : (2 * m_num_partitions);
  for (size_t i = 0; i < vertex_partition_size; ++i) {
    vid_buffer.push_back(
        std::make_shared<vid_buffer_type>(SGRAPH_INGRESS_VID_BUFFER_SIZE,
                                          [](const flexible_type& a,
                                             const flexible_type& b){ return a < b; },
                                          true // deduplicate flag
                                          )
        );
  }
  std::vector<std::shared_ptr<vid_buffer_type>> source_vid_buffers;
  std::vector<std::shared_ptr<vid_buffer_type>> target_vid_buffers;
  if (groupa == groupb) {
    for (size_t i = 0; i < vertex_partition_size; ++i) {
      source_vid_buffers.push_back(vid_buffer[i]);
      target_vid_buffers.push_back(vid_buffer[i]);
    }
  } else {
    for (size_t i = 0; i < vertex_partition_size / 2; ++i) {
      source_vid_buffers.push_back(vid_buffer[i]);
      target_vid_buffers.push_back(vid_buffer[i + m_num_partitions]);
    }
  }

  size_t src_column_idx = edges.column_index(SRC_COLUMN_NAME);
  size_t dst_column_idx = edges.column_index(DST_COLUMN_NAME);

  auto add_to_deduplication_buffer = [&](const std::vector<flexible_type>& row, size_t thread_id) {
    const auto& src_id = row[src_column_idx];
    const auto& dst_id = row[dst_column_idx];
    if (src_id.get_type() == flex_type_enum::UNDEFINED) {
      std::string error_message =
        std::string("source vid column cannot contain missing value. ") +
        "Please use dropna() to drop the missing value from the input and try again";
      log_and_throw(error_message);
    }
    if (dst_id.get_type() == flex_type_enum::UNDEFINED) {
      std::string error_message =
        std::string("target vid column cannot contain missing value. ") +
        "Please use dropna() to drop the missing value from the input and try again";
      log_and_throw(error_message);
    }
    size_t src_partition = get_vertex_partition(src_id);
    size_t dst_partition = get_vertex_partition(dst_id);
    source_vid_buffers[src_partition]->add(src_id, thread_id);
    target_vid_buffers[dst_partition]->add(dst_id, thread_id);
  };

  // prepare the vertex id aggregate buffer for each partition.
  logstream(LOG_EMPH) << "Shuffling edges ..." << std::endl;
  std::vector<sframe> edge_partitions =
    shuffle(edges, m_num_partitions * m_num_partitions,
        [&](const std::vector<flexible_type>& row) {
          return get_edge_partition(row[src_column_idx], row[dst_column_idx]);
        },
        add_to_deduplication_buffer);
  DASSERT_EQ(edge_partitions.size(), m_num_partitions * m_num_partitions);
  logstream(LOG_EMPH) << "Done shuffling edges in " << local_timer.current_time() << " secs" << std::endl;


  local_timer.start();
  logstream(LOG_EMPH) << "Aggregating unique vertices..." << std::endl;
  std::vector<sarray<flexible_type>> unique_vertex_ids(vid_buffer.size());
  parallel_for(0, vid_buffer.size(), [&](size_t i) {
    vid_buffer[i]->close();
    auto& vid_array = unique_vertex_ids[i];
    {
      vid_array.open_for_write(1);
      vid_array.set_type(m_vid_type);
      auto out = vid_array.get_output_iterator(0);
      vid_buffer[i]->sort_and_write(out);
      vid_array.close();
    }
  });
  logstream(LOG_EMPH) << "Done aggregating unique vertex in " << local_timer.current_time() << " secs" << std::endl;

  /**************************************************************************/
  /*                                                                        */
  /*                                 Step 2                                 */
  /*                                                                        */
  /**************************************************************************/
  local_timer.start();
  logstream(LOG_EMPH) << "Combine vertex data" << std::endl;

  std::vector<std::string> column_names_of_first_group = get_vertex_fields(groupa);
  std::vector<flex_type_enum> column_types_of_first_group = get_vertex_field_types(groupa);

  std::vector<std::string> column_names_of_second_group = get_vertex_fields(groupb);
  std::vector<flex_type_enum> column_types_of_second_group = get_vertex_field_types(groupb);

  parallel_for(0, vid_buffer.size(), [&](size_t i) {
  // for (size_t i = 0; i < vid_buffer.size(); ++i) {
    size_t groupid, partitionid;
    timer timer;
    std::vector<std::string> column_names;
    std::vector<flex_type_enum> column_types;
    if (i < m_num_partitions) {
      groupid = groupa;
      partitionid = i;
      column_names = column_names_of_first_group;
      column_types = column_types_of_first_group;
    } else {
      groupid = groupb;
      partitionid = i - m_num_partitions;
      column_names = column_names_of_second_group;
      column_types = column_types_of_second_group;
    }

    std::vector<flexible_type> old_vids = get_vertex_ids(partitionid, groupid);
    google::sparse_hash_set<flexible_type, std::hash<flexible_type>>
        old_vid_set(std::make_move_iterator(old_vids.begin()),
                    std::make_move_iterator(old_vids.end()));

    sarray<flexible_type>& raw_id_sarray = unique_vertex_ids[i];
    sarray<flexible_type> new_raw_id_sarray;
    size_t new_vertices_cnt = 0;

    if (old_vids.empty()) {
      new_raw_id_sarray = raw_id_sarray;
    } else {
      new_raw_id_sarray.open_for_write(1);
      new_raw_id_sarray.set_type(m_vid_type);
      turi::copy_if(raw_id_sarray,
                        new_raw_id_sarray,
                        [&](const flexible_type& id) {
                          return old_vid_set.count(id) == 0;
                        });
      new_raw_id_sarray.close();
    }
    new_vertices_cnt = new_raw_id_sarray.size();

    sframe new_vertices;
    vertices_added += new_vertices_cnt;
    new_vertices = new_vertices.add_column(std::make_shared<sarray<flexible_type>>(new_raw_id_sarray),
                                           VID_COLUMN_NAME);
    logstream(LOG_INFO) << "Finish writing new vertices in partition " << partitionid
                        << " in " << timer.current_time() << " secs" << std::endl;

    sframe& old_vertices = vertex_partition(partitionid, groupid);
    ASSERT_TRUE(union_columns(old_vertices, new_vertices));
    old_vertices = old_vertices.append(new_vertices);

    // debug print
    // std::cerr << "New vertices in partition " << i << ":\n";
    // new_vertices.debug_print();
  // }
  });

  logstream(LOG_EMPH) << "Done phase 2 in " << local_timer.current_time() << " secs" << std::endl;

  /**************************************************************************/
  /*                                                                        */
  /*                                 Step 3                                 */
  /*                                                                        */
  /**************************************************************************/
  local_timer.start();
  logstream(LOG_EMPH) << "Rename id columns " << std::endl;

  std::unordered_map<std::pair<size_t, size_t>,
                     std::shared_ptr<vid_hash_map_type>> vid_hash_map_cache;

  /**
   * Preamble function to load the vid to row id lookup table in memory for the
   * next batch of coordinates.
   */
  std::function<void(std::vector<std::pair<size_t, size_t>>)>
    load_vid_hash_map_cache = [&](std::vector<std::pair<size_t, size_t>> coordinates) {
      std::unordered_set<std::pair<size_t, size_t>> block_to_load;
      for (auto& corrd : coordinates) {
        block_to_load.insert({corrd.first, groupa});
        block_to_load.insert({corrd.second, groupb});
      }
      std::unordered_set<std::pair<size_t, size_t>> block_to_unload;
      for (auto& kv : vid_hash_map_cache) {
        if (block_to_load.count(kv.first)) {
          block_to_load.erase(block_to_load.find(kv.first));
        } else {
          block_to_unload.insert(kv.first);
        }
      }
      for (auto& coord : block_to_unload) {
        vid_hash_map_cache.erase(vid_hash_map_cache.find(coord));
      }
      for (auto& coord : block_to_load) {
        vid_hash_map_cache[coord] = {};
      }
      std::vector<std::pair<size_t, size_t>> block_to_load_vec(
          block_to_load.begin(), block_to_load.end());
      parallel_for(0, block_to_load_vec.size(), [&](size_t i) {
          auto coord = block_to_load_vec[i];
          vid_hash_map_cache[coord] = fetch_vid_hash_map(coord.first, coord.second);
          });

      std::stringstream message_ss;
      message_ss <<  "Processing edge partitions: ";
      for (auto& coord: coordinates) {
        message_ss << "(" << coord.first << " , " << coord.second << ") ";
      }
      logstream(LOG_INFO) << message_ss.str() << std::endl;
      logstream(LOG_INFO) << "Number of vid maps in cache: " << vid_hash_map_cache.size() << std::endl;
    };

  //// Second pass for every edge, we rename the global vertex id
  //// into the rowid in the local vertex partition.
  sgraph_compute::hilbert_blocked_parallel_for(
      m_num_partitions,
      load_vid_hash_map_cache,
      [&](std::pair<size_t, size_t> coordinate) {
        size_t i = coordinate.first;
        size_t j = coordinate.second;
        size_t edge_partition_id = i * m_num_partitions + j;

        std::shared_ptr<vid_hash_map_type>
          vid_lookup_a = vid_hash_map_cache[{i, groupa}];
        std::shared_ptr<vid_hash_map_type>
          vid_lookup_b = vid_hash_map_cache[{j, groupb}];

        auto& new_edges = edge_partitions[edge_partition_id];
        size_t src_column_idx = new_edges.column_index(SRC_COLUMN_NAME);
        size_t dst_column_idx = new_edges.column_index(DST_COLUMN_NAME);
        auto src_column = new_edges.select_column(src_column_idx);
        auto dst_column = new_edges.select_column(dst_column_idx);
        std::shared_ptr<sarray<flexible_type>> new_src_column
          = std::make_shared<sarray<flexible_type>>();
        std::shared_ptr<sarray<flexible_type>> new_dst_column
          = std::make_shared<sarray<flexible_type>>();
        new_src_column->open_for_write(src_column->num_segments());
        new_src_column->set_type(flex_type_enum::INTEGER);
        transform(*src_column, *new_src_column,
            [&](const flexible_type& val) {
              return vid_lookup_a->find(val)->second;
            });
        new_src_column->close();
        new_dst_column->open_for_write(dst_column->num_segments());
        new_dst_column->set_type(flex_type_enum::INTEGER);
        transform(*dst_column, *new_dst_column,
            [&](const flexible_type& val) {
              return vid_lookup_b->find(val)->second;
            });
        new_dst_column->close();

        sframe normalized_edges({new_src_column, new_dst_column},
                                {SRC_COLUMN_NAME, DST_COLUMN_NAME});
        for (auto& col : new_edges.column_names()) {
          if (col != SRC_COLUMN_NAME && col != DST_COLUMN_NAME) {
            auto data_col = new_edges.select_column(new_edges.column_index(col));
            normalized_edges = normalized_edges.add_column(data_col, col);
          }
        }

        // commit the new edge block
        sframe& old_edges = edge_partition(i, j, groupa, groupb);
        ASSERT_TRUE(union_columns(old_edges, normalized_edges));

        size_t prev_size = old_edges.num_rows();
        old_edges = old_edges.append(normalized_edges);
        edges_added += (old_edges.num_rows() - prev_size);
      });

  logstream(LOG_EMPH) << "Done in " << local_timer.current_time() << " secs" << std::endl;

  logstream(LOG_EMPH) << "Finish committing edge in "
                       << global_timer.current_time() << " secs" << std::endl;

  m_num_edges += edges_added;
  m_num_vertices += vertices_added;
}