in src/core/storage/sgraph_data/sgraph.cpp [232:431]
sframe sgraph::get_edges(const std::vector<flexible_type>& source_vids,
const std::vector<flexible_type>& target_vids,
const options_map_t& field_constraint,
size_t groupa, size_t groupb) const {
sframe ret;
if (num_edges(groupa, groupb) == 0) {
ret.open_for_write(get_edge_fields(), get_edge_field_types());
ret.close();
return ret;
}
std::vector<std::string> efields = get_edge_fields(groupa, groupb);
const std::vector<sframe>& egroup = edge_group(groupa, groupb);
// Configure the field constraints
std::vector< std::pair<size_t, flexible_type> > value_constraint;
for (const auto& kv : field_constraint) {
value_constraint.push_back({egroup[0].column_index(kv.first), kv.second});
}
// lambda for checking value constraint
std::function<bool(const std::vector<flexible_type>&)> satisfy_value_constraint =
[&](const std::vector<flexible_type>& edge_data) {
for(const auto& kv : value_constraint) {
const flexible_type& actual = edge_data[kv.first];
const flexible_type& expected = kv.second;
if (!(
(expected.get_type() == flex_type_enum::UNDEFINED)
|| (actual == expected))) {
return false;
}
}
return true;
};
// column indices of the source vid and target vid
size_t src_column_idx = egroup[0].column_index(SRC_COLUMN_NAME);
size_t dst_column_idx = egroup[0].column_index(DST_COLUMN_NAME);
// lambda for transform edge ids
std::function<std::vector<flexible_type>(const std::vector<flexible_type>&,
const std::vector<flexible_type>& source_vids,
const std::vector<flexible_type>& target_vids)>
edge_id_transform = [&](const std::vector<flexible_type>& row,
const std::vector<flexible_type>& source_vids,
const std::vector<flexible_type>& target_vids) {
std::vector<flexible_type> ret = row;
size_t src_idx = row[src_column_idx];
size_t dst_idx = row[dst_column_idx];
DASSERT_LT(src_idx, source_vids.size());
DASSERT_LT(dst_idx, target_vids.size());
ret[src_column_idx] = source_vids[src_idx];
ret[dst_column_idx] = target_vids[dst_idx];
return ret;
};
// Cache of {partition, group} -> List[vertex_ids]
// Preamble functions will load the requied ids into this cache.
std::unordered_map<std::pair<size_t, size_t>, std::vector<flexible_type>>
partition_vid_cache;
std::function<void(std::vector<std::pair<size_t, size_t>>)> load_partition_vids =
[&](std::vector<std::pair<size_t, size_t>> coordinates) {
std::set<std::pair<size_t, size_t>> pairs_to_load;
std::set<std::pair<size_t, size_t>> pairs_to_unload;
for (auto& c: coordinates) {
pairs_to_load.insert({c.first, groupa});
pairs_to_load.insert({c.second, groupb});
}
for (auto& kv: partition_vid_cache) {
if (pairs_to_load.count(kv.first)) {
pairs_to_load.erase(pairs_to_load.find(kv.first));
} else {
pairs_to_unload.insert(kv.first);
}
}
for (auto& pair: pairs_to_unload) {
partition_vid_cache.erase(partition_vid_cache.find(pair));
}
for (auto& pair: pairs_to_load) {
partition_vid_cache[pair] = {};
}
std::vector<std::pair<size_t, size_t>> pairs_to_load_vec(
pairs_to_load.begin(), pairs_to_load.end());
parallel_for(0, pairs_to_load_vec.size(), [&](size_t i) {
auto coord = pairs_to_load_vec[i];
partition_vid_cache[coord] = get_vertex_ids(coord.first, coord.second);
});
};
// Configure the id constraints
bool match_all_vertices = true;
for (size_t i = 0; i < source_vids.size(); ++i) {
if (!((source_vids[i].get_type() == flex_type_enum::UNDEFINED)
&& (target_vids[i].get_type() == flex_type_enum::UNDEFINED))) {
match_all_vertices = false;
break;
}
}
std::vector<sframe> out_edge_blocks(m_num_partitions * m_num_partitions);
// Case 1: there is no source or target id constraints.
if (match_all_vertices) {
sgraph_compute::hilbert_blocked_parallel_for(
get_num_partitions(),
load_partition_vids,
[&](std::pair<size_t, size_t> coordinate) {
size_t i = coordinate.first;
size_t j = coordinate.second;
const std::vector<flexible_type>& src_partition_vids = partition_vid_cache.at({i, groupa});
const std::vector<flexible_type>& dst_partition_vids = partition_vid_cache.at({j, groupb});
sframe edge_sframe = edge_partition(i, j, groupa, groupb);
std::vector<flex_type_enum> out_column_types = edge_sframe.column_types();
out_column_types[src_column_idx] = m_vid_type;
out_column_types[dst_column_idx] = m_vid_type;
sframe out_sframe;
out_sframe.open_for_write(edge_sframe.column_names(), out_column_types,
"", edge_sframe.num_segments());
copy_transform_if(edge_sframe, out_sframe, satisfy_value_constraint,
boost::bind(edge_id_transform, _1,
boost::cref(src_partition_vids),
boost::cref(dst_partition_vids)));
out_sframe.close();
out_edge_blocks[i * m_num_partitions + j] = std::move(out_sframe);
});
// Case 2: reorganize the id constraints into partitions.
} else {
// separate the source/target vertices that are matching wildcards.
std::vector<std::unordered_set<flexible_type>> wild_source_vids(m_num_partitions);
std::vector<std::unordered_set<flexible_type>> wild_target_vids(m_num_partitions);
// normal source target id constraints: map from (src_partition, dst_partition) -> set<(src_id, dst_id>
std::unordered_map<std::pair<size_t, size_t>, std::unordered_set<std::pair<flexible_type, flexible_type>>>
vid_constraints;
for (size_t i = 0; i < m_num_partitions; ++i) {
for (size_t j = 0; j < m_num_partitions; ++j) {
vid_constraints[{i,j}] = {};
}
}
for (size_t i = 0; i < source_vids.size(); ++i) {
const flexible_type& source = source_vids[i];
const flexible_type& target = target_vids[i];
size_t source_pid = source.hash() % m_num_partitions;
size_t target_pid = target.hash() % m_num_partitions;
if (source.get_type() == flex_type_enum::UNDEFINED) {
wild_target_vids[target_pid].insert(target);
} else if (target.get_type() == flex_type_enum::UNDEFINED) {
wild_source_vids[source_pid].insert(source);
} else {
vid_constraints[{source_pid, target_pid}].insert({source, target});
}
}
sgraph_compute::hilbert_blocked_parallel_for(
get_num_partitions(),
load_partition_vids,
[&](std::pair<size_t, size_t> coordinate) {
size_t i = coordinate.first;
size_t j = coordinate.second;
const std::vector<flexible_type>& src_partition_vids = partition_vid_cache.at({i, groupa});
const std::vector<flexible_type>& dst_partition_vids = partition_vid_cache.at({j, groupb});
sframe edge_sframe = edge_partition(i, j, groupa, groupb);
std::vector<flex_type_enum> out_column_types = edge_sframe.column_types();
out_column_types[src_column_idx] = m_vid_type;
out_column_types[dst_column_idx] = m_vid_type;
sframe out_sframe;
out_sframe.open_for_write(edge_sframe.column_names(), out_column_types,
"", edge_sframe.num_segments());
// The filter function checks the id constraints and then value constraints
std::function<bool(const std::vector<flexible_type>&)> filter_fn =
[&](const std::vector<flexible_type>& row) {
size_t src_idx = row[src_column_idx];
size_t dst_idx = row[dst_column_idx];
const flexible_type& source = src_partition_vids[src_idx];
const flexible_type& target = dst_partition_vids[dst_idx];
std::pair<size_t, size_t> partition_addr{i, j};
std::pair<flexible_type, flexible_type> source_target_pair{source, target};
if ((wild_source_vids[i].count(source) || wild_target_vids[j].count(target)) ||
(vid_constraints.at(partition_addr).count(source_target_pair) > 0)) {
return satisfy_value_constraint(row);
}
return false;
};
copy_transform_if(edge_sframe, out_sframe, filter_fn,
boost::bind(edge_id_transform, _1,
boost::cref(src_partition_vids),
boost::cref(dst_partition_vids)));
out_sframe.close();
out_edge_blocks[i * m_num_partitions + j] = std::move(out_sframe);
});
}
for (auto& sf : out_edge_blocks)
ret = ret.append(sf);
return ret;
}