in src/core/storage/sgraph_data/sgraph.cpp [599:897]
void sgraph::commit_edge_buffer(size_t groupa,
size_t groupb,
sframe edges) {
timer local_timer, global_timer;
std::atomic<size_t> vertices_added(0);
std::atomic<size_t> edges_added(0);
global_timer.start();
logstream(LOG_EMPH) << "In commit edge buffer (" << groupa << "," << groupb << ")"
<< std::endl;
// This function is monsterous. Bascially, 3 big steps are involved.
// 1. Keeping track of the new vertex id that got introduced by the incoming edges.
// 2. Add an empty vertex for each new vertex id to its vertex partition.
// 3. Translate the source and target ids in each edge partition to
// be the row id of the corresponding vertex in the vertex partition, and append
// to the existing edge partition.
/**************************************************************************/
/* */
/* Step 1 */
/* */
/**************************************************************************/
//// First pass of all edge partitions, we aggregate all unique vertices
//// seen in each vertex partition.
local_timer.start();
// Prepare vertex dedupliication buffer for each partition
typedef sarray_sorted_buffer<flexible_type> vid_buffer_type;
std::vector<std::shared_ptr<vid_buffer_type>> vid_buffer;
size_t vertex_partition_size = (groupa == groupb) ? m_num_partitions : (2 * m_num_partitions);
for (size_t i = 0; i < vertex_partition_size; ++i) {
vid_buffer.push_back(
std::make_shared<vid_buffer_type>(SGRAPH_INGRESS_VID_BUFFER_SIZE,
[](const flexible_type& a,
const flexible_type& b){ return a < b; },
true // deduplicate flag
)
);
}
std::vector<std::shared_ptr<vid_buffer_type>> source_vid_buffers;
std::vector<std::shared_ptr<vid_buffer_type>> target_vid_buffers;
if (groupa == groupb) {
for (size_t i = 0; i < vertex_partition_size; ++i) {
source_vid_buffers.push_back(vid_buffer[i]);
target_vid_buffers.push_back(vid_buffer[i]);
}
} else {
for (size_t i = 0; i < vertex_partition_size / 2; ++i) {
source_vid_buffers.push_back(vid_buffer[i]);
target_vid_buffers.push_back(vid_buffer[i + m_num_partitions]);
}
}
size_t src_column_idx = edges.column_index(SRC_COLUMN_NAME);
size_t dst_column_idx = edges.column_index(DST_COLUMN_NAME);
auto add_to_deduplication_buffer = [&](const std::vector<flexible_type>& row, size_t thread_id) {
const auto& src_id = row[src_column_idx];
const auto& dst_id = row[dst_column_idx];
if (src_id.get_type() == flex_type_enum::UNDEFINED) {
std::string error_message =
std::string("source vid column cannot contain missing value. ") +
"Please use dropna() to drop the missing value from the input and try again";
log_and_throw(error_message);
}
if (dst_id.get_type() == flex_type_enum::UNDEFINED) {
std::string error_message =
std::string("target vid column cannot contain missing value. ") +
"Please use dropna() to drop the missing value from the input and try again";
log_and_throw(error_message);
}
size_t src_partition = get_vertex_partition(src_id);
size_t dst_partition = get_vertex_partition(dst_id);
source_vid_buffers[src_partition]->add(src_id, thread_id);
target_vid_buffers[dst_partition]->add(dst_id, thread_id);
};
// prepare the vertex id aggregate buffer for each partition.
logstream(LOG_EMPH) << "Shuffling edges ..." << std::endl;
std::vector<sframe> edge_partitions =
shuffle(edges, m_num_partitions * m_num_partitions,
[&](const std::vector<flexible_type>& row) {
return get_edge_partition(row[src_column_idx], row[dst_column_idx]);
},
add_to_deduplication_buffer);
DASSERT_EQ(edge_partitions.size(), m_num_partitions * m_num_partitions);
logstream(LOG_EMPH) << "Done shuffling edges in " << local_timer.current_time() << " secs" << std::endl;
local_timer.start();
logstream(LOG_EMPH) << "Aggregating unique vertices..." << std::endl;
std::vector<sarray<flexible_type>> unique_vertex_ids(vid_buffer.size());
parallel_for(0, vid_buffer.size(), [&](size_t i) {
vid_buffer[i]->close();
auto& vid_array = unique_vertex_ids[i];
{
vid_array.open_for_write(1);
vid_array.set_type(m_vid_type);
auto out = vid_array.get_output_iterator(0);
vid_buffer[i]->sort_and_write(out);
vid_array.close();
}
});
logstream(LOG_EMPH) << "Done aggregating unique vertex in " << local_timer.current_time() << " secs" << std::endl;
/**************************************************************************/
/* */
/* Step 2 */
/* */
/**************************************************************************/
local_timer.start();
logstream(LOG_EMPH) << "Combine vertex data" << std::endl;
std::vector<std::string> column_names_of_first_group = get_vertex_fields(groupa);
std::vector<flex_type_enum> column_types_of_first_group = get_vertex_field_types(groupa);
std::vector<std::string> column_names_of_second_group = get_vertex_fields(groupb);
std::vector<flex_type_enum> column_types_of_second_group = get_vertex_field_types(groupb);
parallel_for(0, vid_buffer.size(), [&](size_t i) {
// for (size_t i = 0; i < vid_buffer.size(); ++i) {
size_t groupid, partitionid;
timer timer;
std::vector<std::string> column_names;
std::vector<flex_type_enum> column_types;
if (i < m_num_partitions) {
groupid = groupa;
partitionid = i;
column_names = column_names_of_first_group;
column_types = column_types_of_first_group;
} else {
groupid = groupb;
partitionid = i - m_num_partitions;
column_names = column_names_of_second_group;
column_types = column_types_of_second_group;
}
std::vector<flexible_type> old_vids = get_vertex_ids(partitionid, groupid);
google::sparse_hash_set<flexible_type, std::hash<flexible_type>>
old_vid_set(std::make_move_iterator(old_vids.begin()),
std::make_move_iterator(old_vids.end()));
sarray<flexible_type>& raw_id_sarray = unique_vertex_ids[i];
sarray<flexible_type> new_raw_id_sarray;
size_t new_vertices_cnt = 0;
if (old_vids.empty()) {
new_raw_id_sarray = raw_id_sarray;
} else {
new_raw_id_sarray.open_for_write(1);
new_raw_id_sarray.set_type(m_vid_type);
turi::copy_if(raw_id_sarray,
new_raw_id_sarray,
[&](const flexible_type& id) {
return old_vid_set.count(id) == 0;
});
new_raw_id_sarray.close();
}
new_vertices_cnt = new_raw_id_sarray.size();
sframe new_vertices;
vertices_added += new_vertices_cnt;
new_vertices = new_vertices.add_column(std::make_shared<sarray<flexible_type>>(new_raw_id_sarray),
VID_COLUMN_NAME);
logstream(LOG_INFO) << "Finish writing new vertices in partition " << partitionid
<< " in " << timer.current_time() << " secs" << std::endl;
sframe& old_vertices = vertex_partition(partitionid, groupid);
ASSERT_TRUE(union_columns(old_vertices, new_vertices));
old_vertices = old_vertices.append(new_vertices);
// debug print
// std::cerr << "New vertices in partition " << i << ":\n";
// new_vertices.debug_print();
// }
});
logstream(LOG_EMPH) << "Done phase 2 in " << local_timer.current_time() << " secs" << std::endl;
/**************************************************************************/
/* */
/* Step 3 */
/* */
/**************************************************************************/
local_timer.start();
logstream(LOG_EMPH) << "Rename id columns " << std::endl;
std::unordered_map<std::pair<size_t, size_t>,
std::shared_ptr<vid_hash_map_type>> vid_hash_map_cache;
/**
* Preamble function to load the vid to row id lookup table in memory for the
* next batch of coordinates.
*/
std::function<void(std::vector<std::pair<size_t, size_t>>)>
load_vid_hash_map_cache = [&](std::vector<std::pair<size_t, size_t>> coordinates) {
std::unordered_set<std::pair<size_t, size_t>> block_to_load;
for (auto& corrd : coordinates) {
block_to_load.insert({corrd.first, groupa});
block_to_load.insert({corrd.second, groupb});
}
std::unordered_set<std::pair<size_t, size_t>> block_to_unload;
for (auto& kv : vid_hash_map_cache) {
if (block_to_load.count(kv.first)) {
block_to_load.erase(block_to_load.find(kv.first));
} else {
block_to_unload.insert(kv.first);
}
}
for (auto& coord : block_to_unload) {
vid_hash_map_cache.erase(vid_hash_map_cache.find(coord));
}
for (auto& coord : block_to_load) {
vid_hash_map_cache[coord] = {};
}
std::vector<std::pair<size_t, size_t>> block_to_load_vec(
block_to_load.begin(), block_to_load.end());
parallel_for(0, block_to_load_vec.size(), [&](size_t i) {
auto coord = block_to_load_vec[i];
vid_hash_map_cache[coord] = fetch_vid_hash_map(coord.first, coord.second);
});
std::stringstream message_ss;
message_ss << "Processing edge partitions: ";
for (auto& coord: coordinates) {
message_ss << "(" << coord.first << " , " << coord.second << ") ";
}
logstream(LOG_INFO) << message_ss.str() << std::endl;
logstream(LOG_INFO) << "Number of vid maps in cache: " << vid_hash_map_cache.size() << std::endl;
};
//// Second pass for every edge, we rename the global vertex id
//// into the rowid in the local vertex partition.
sgraph_compute::hilbert_blocked_parallel_for(
m_num_partitions,
load_vid_hash_map_cache,
[&](std::pair<size_t, size_t> coordinate) {
size_t i = coordinate.first;
size_t j = coordinate.second;
size_t edge_partition_id = i * m_num_partitions + j;
std::shared_ptr<vid_hash_map_type>
vid_lookup_a = vid_hash_map_cache[{i, groupa}];
std::shared_ptr<vid_hash_map_type>
vid_lookup_b = vid_hash_map_cache[{j, groupb}];
auto& new_edges = edge_partitions[edge_partition_id];
size_t src_column_idx = new_edges.column_index(SRC_COLUMN_NAME);
size_t dst_column_idx = new_edges.column_index(DST_COLUMN_NAME);
auto src_column = new_edges.select_column(src_column_idx);
auto dst_column = new_edges.select_column(dst_column_idx);
std::shared_ptr<sarray<flexible_type>> new_src_column
= std::make_shared<sarray<flexible_type>>();
std::shared_ptr<sarray<flexible_type>> new_dst_column
= std::make_shared<sarray<flexible_type>>();
new_src_column->open_for_write(src_column->num_segments());
new_src_column->set_type(flex_type_enum::INTEGER);
transform(*src_column, *new_src_column,
[&](const flexible_type& val) {
return vid_lookup_a->find(val)->second;
});
new_src_column->close();
new_dst_column->open_for_write(dst_column->num_segments());
new_dst_column->set_type(flex_type_enum::INTEGER);
transform(*dst_column, *new_dst_column,
[&](const flexible_type& val) {
return vid_lookup_b->find(val)->second;
});
new_dst_column->close();
sframe normalized_edges({new_src_column, new_dst_column},
{SRC_COLUMN_NAME, DST_COLUMN_NAME});
for (auto& col : new_edges.column_names()) {
if (col != SRC_COLUMN_NAME && col != DST_COLUMN_NAME) {
auto data_col = new_edges.select_column(new_edges.column_index(col));
normalized_edges = normalized_edges.add_column(data_col, col);
}
}
// commit the new edge block
sframe& old_edges = edge_partition(i, j, groupa, groupb);
ASSERT_TRUE(union_columns(old_edges, normalized_edges));
size_t prev_size = old_edges.num_rows();
old_edges = old_edges.append(normalized_edges);
edges_added += (old_edges.num_rows() - prev_size);
});
logstream(LOG_EMPH) << "Done in " << local_timer.current_time() << " secs" << std::endl;
logstream(LOG_EMPH) << "Finish committing edge in "
<< global_timer.current_time() << " secs" << std::endl;
m_num_edges += edges_added;
m_num_vertices += vertices_added;
}