in flex/engines/graph_db/runtime/common/operators/retrieve/intersect.cc [163:265]
static bl::result<Context> intersect_impl(Context&& ctx,
std::vector<Context>&& ctxs,
int key) {
if (ctxs[0].get(key)->column_type() == ContextColumnType::kVertex) {
if (ctxs.size() == 2) {
auto& vlist0 =
*(std::dynamic_pointer_cast<IVertexColumn>(ctxs[0].get(key)));
auto& vlist1 =
*(std::dynamic_pointer_cast<IVertexColumn>(ctxs[1].get(key)));
if (!vlist0.is_optional() && vlist1.is_optional()) {
return left_outer_intersect(std::move(ctx), std::move(ctxs[0]),
std::move(ctxs[1]), key);
} else if (vlist0.is_optional() && !vlist1.is_optional()) {
return left_outer_intersect(std::move(ctx), std::move(ctxs[1]),
std::move(ctxs[0]), key);
} else if (vlist0.is_optional() && vlist1.is_optional()) {
// LOG(INFO) << "both optional" << vlist0.size() << " " <<
// vlist1.size();
}
auto& idx_col0 = ctxs[0].get_offsets();
auto& idx_col1 = ctxs[1].get_offsets();
std::vector<size_t> offsets0(idx_col0.size()), offsets1(idx_col1.size());
size_t maxi = ctx.row_num();
std::vector<std::vector<size_t>> vec0(maxi + 1), vec1(maxi + 1);
std::vector<size_t> shuffle_offsets;
std::vector<size_t> shuffle_offsets_1;
for (size_t i = 0; i < idx_col0.size(); ++i) {
vec0[idx_col0.get_value(i)].push_back(i);
}
for (size_t i = 0; i < idx_col1.size(); ++i) {
vec1[idx_col1.get_value(i)].push_back(i);
}
size_t len = vec0.size();
for (size_t i = 0; i < len; ++i) {
if (vec1[i].empty() || vec0[i].empty()) {
continue;
}
if (vec0.size() < vec1.size()) {
phmap::flat_hash_map<VertexRecord, std::vector<size_t>,
VertexRecordHash>
left_map;
for (auto& j : vec0[i]) {
left_map[vlist0.get_vertex(j)].push_back(j);
}
for (auto& k : vec1[i]) {
auto iter = left_map.find(vlist1.get_vertex(k));
if (iter != left_map.end()) {
for (auto& idx : iter->second) {
shuffle_offsets.push_back(idx);
shuffle_offsets_1.push_back(k);
}
}
}
} else {
phmap::flat_hash_map<VertexRecord, std::vector<size_t>,
VertexRecordHash>
right_map;
for (auto& k : vec1[i]) {
right_map[vlist1.get_vertex(k)].push_back(k);
}
for (auto& j : vec0[i]) {
auto iter = right_map.find(vlist0.get_vertex(j));
if (iter != right_map.end()) {
for (auto& idx : iter->second) {
shuffle_offsets.push_back(j);
shuffle_offsets_1.push_back(idx);
}
}
}
}
}
ctxs[0].reshuffle(shuffle_offsets);
ctxs[1].reshuffle(shuffle_offsets_1);
ctx.reshuffle(ctxs[0].get_offsets().data());
for (size_t i = 0; i < ctxs[0].col_num() || i < ctxs[1].col_num(); ++i) {
if (i >= ctx.col_num() || ctx.get(i) == nullptr) {
std::shared_ptr<IContextColumn> col(nullptr);
if (i < ctxs[0].col_num()) {
if (ctxs[0].get(i) != nullptr) {
col = ctxs[0].get(i);
}
}
if (col == nullptr && i < ctxs[1].col_num()) {
if (ctxs[1].get(i) != nullptr) {
col = ctxs[1].get(i);
}
}
ctx.set(i, col);
}
}
return ctx;
}
}
LOG(ERROR) << "Currently we only support intersect on vertex columns";
RETURN_NOT_IMPLEMENTED_ERROR(
"Currently we only support intersect on vertex "
"columns");
}