in flex/engines/graph_db/runtime/common/operators/retrieve/intersect.cc [26:162]
static Context left_outer_intersect(Context&& ctx, Context&& ctx0,
Context&& ctx1, int key) {
// specifically, this function is called when the first context is not
// optional and the second context is optional
auto& idx_col0 = ctx0.get_offsets();
auto& idx_col1 = ctx1.get_offsets();
auto& vlist0 = *(std::dynamic_pointer_cast<IVertexColumn>(ctx0.get(key)));
auto& vlist1 = *(std::dynamic_pointer_cast<IVertexColumn>(ctx1.get(key)));
std::vector<size_t> offset0, offset1;
if (ctx0.row_num() == ctx.row_num()) {
bool flag = true;
for (size_t i = 0; i < idx_col0.size(); ++i) {
if (idx_col0.get_value(i) != i) {
flag = false;
break;
}
}
if (flag) {
size_t j = 0;
for (size_t i = 0; i < ctx0.row_num(); i++) {
bool exist = false;
for (; j < ctx1.row_num(); ++j) {
if (idx_col1.get_value(j) != idx_col0.get_value(i)) {
break;
}
if (vlist1.has_value(j) &&
vlist0.get_vertex(i) == vlist1.get_vertex(j)) {
exist = true;
offset0.emplace_back(i);
offset1.emplace_back(j);
}
}
if (!exist) {
offset0.emplace_back(i);
offset1.emplace_back(std::numeric_limits<size_t>::max());
}
}
ctx0.reshuffle(offset0);
ctx1.optional_reshuffle(offset1);
ctx.reshuffle(ctx0.get_offsets().data());
for (size_t i = 0; i < ctx0.col_num() || i < ctx1.col_num(); ++i) {
if (i < ctx0.col_num()) {
if ((i >= ctx.col_num() || ctx.get(i) == nullptr) &&
ctx0.get(i) != nullptr) {
ctx.set(i, ctx0.get(i));
}
}
if (i < ctx1.col_num()) {
if ((i >= ctx.col_num() || ctx.get(i) == nullptr) &&
ctx1.get(i) != nullptr) {
ctx.set(i, ctx1.get(i));
}
} else if (i >= ctx.col_num()) {
ctx.set(i, nullptr);
}
}
return ctx;
}
}
std::vector<size_t> shuffle_offsets, shuffle_offsets_1;
std::vector<std::vector<size_t>> vec0(ctx.row_num() + 1),
vec1(ctx.row_num() + 1);
for (size_t i = 0; i < idx_col0.size(); ++i) {
vec0[idx_col0.get_value(i)].push_back(i);
}
for (size_t i = 0; i < idx_col1.size(); ++i) {
vec1[idx_col1.get_value(i)].push_back(i);
}
size_t len = vec0.size();
for (size_t i = 0; i < len; ++i) {
if (vec1[i].empty()) {
if (!vec0[i].empty()) {
for (auto& j : vec0[i]) {
shuffle_offsets.push_back(j);
shuffle_offsets_1.push_back(std::numeric_limits<size_t>::max());
}
}
continue;
}
if (vec0.size() < vec1.size()) {
phmap::flat_hash_map<VertexRecord, std::vector<size_t>, VertexRecordHash>
left_map;
for (auto& j : vec0[i]) {
left_map[vlist0.get_vertex(j)].push_back(j);
}
for (auto& k : vec1[i]) {
auto iter = left_map.find(vlist1.get_vertex(k));
if (iter != left_map.end()) {
for (auto& idx : iter->second) {
shuffle_offsets.push_back(idx);
shuffle_offsets_1.push_back(k);
}
}
}
} else {
phmap::flat_hash_map<VertexRecord, std::vector<size_t>, VertexRecordHash>
right_map;
for (auto& k : vec1[i]) {
right_map[vlist1.get_vertex(k)].push_back(k);
}
for (auto& j : vec0[i]) {
auto iter = right_map.find(vlist0.get_vertex(j));
if (iter != right_map.end()) {
for (auto& idx : iter->second) {
shuffle_offsets.push_back(j);
shuffle_offsets_1.push_back(idx);
}
} else {
shuffle_offsets.push_back(j);
shuffle_offsets_1.push_back(std::numeric_limits<size_t>::max());
}
}
}
}
ctx0.reshuffle(shuffle_offsets);
ctx1.optional_reshuffle(shuffle_offsets_1);
ctx.reshuffle(ctx0.get_offsets().data());
for (size_t i = 0; i < ctx0.col_num() || i < ctx1.col_num(); ++i) {
if (i >= ctx.col_num() || ctx.get(i) == nullptr) {
std::shared_ptr<IContextColumn> col(nullptr);
if (i < ctx0.col_num()) {
if (ctx0.get(i) != nullptr) {
col = ctx0.get(i);
}
}
if (col == nullptr && i < ctx1.col_num()) {
if (ctx1.get(i) != nullptr) {
col = ctx1.get(i);
}
}
ctx.set(i, col);
}
}
return ctx;
}