static bl::result intersect_impl()

in flex/engines/graph_db/runtime/common/operators/retrieve/intersect.cc [163:265]


static bl::result<Context> intersect_impl(Context&& ctx,
                                          std::vector<Context>&& ctxs,
                                          int key) {
  if (ctxs[0].get(key)->column_type() == ContextColumnType::kVertex) {
    if (ctxs.size() == 2) {
      auto& vlist0 =
          *(std::dynamic_pointer_cast<IVertexColumn>(ctxs[0].get(key)));
      auto& vlist1 =
          *(std::dynamic_pointer_cast<IVertexColumn>(ctxs[1].get(key)));
      if (!vlist0.is_optional() && vlist1.is_optional()) {
        return left_outer_intersect(std::move(ctx), std::move(ctxs[0]),
                                    std::move(ctxs[1]), key);
      } else if (vlist0.is_optional() && !vlist1.is_optional()) {
        return left_outer_intersect(std::move(ctx), std::move(ctxs[1]),
                                    std::move(ctxs[0]), key);
      } else if (vlist0.is_optional() && vlist1.is_optional()) {
        //        LOG(INFO) << "both optional" << vlist0.size() << " " <<
        //        vlist1.size();
      }
      auto& idx_col0 = ctxs[0].get_offsets();
      auto& idx_col1 = ctxs[1].get_offsets();
      std::vector<size_t> offsets0(idx_col0.size()), offsets1(idx_col1.size());
      size_t maxi = ctx.row_num();
      std::vector<std::vector<size_t>> vec0(maxi + 1), vec1(maxi + 1);

      std::vector<size_t> shuffle_offsets;
      std::vector<size_t> shuffle_offsets_1;

      for (size_t i = 0; i < idx_col0.size(); ++i) {
        vec0[idx_col0.get_value(i)].push_back(i);
      }
      for (size_t i = 0; i < idx_col1.size(); ++i) {
        vec1[idx_col1.get_value(i)].push_back(i);
      }
      size_t len = vec0.size();
      for (size_t i = 0; i < len; ++i) {
        if (vec1[i].empty() || vec0[i].empty()) {
          continue;
        }

        if (vec0.size() < vec1.size()) {
          phmap::flat_hash_map<VertexRecord, std::vector<size_t>,
                               VertexRecordHash>
              left_map;
          for (auto& j : vec0[i]) {
            left_map[vlist0.get_vertex(j)].push_back(j);
          }
          for (auto& k : vec1[i]) {
            auto iter = left_map.find(vlist1.get_vertex(k));
            if (iter != left_map.end()) {
              for (auto& idx : iter->second) {
                shuffle_offsets.push_back(idx);
                shuffle_offsets_1.push_back(k);
              }
            }
          }
        } else {
          phmap::flat_hash_map<VertexRecord, std::vector<size_t>,
                               VertexRecordHash>
              right_map;
          for (auto& k : vec1[i]) {
            right_map[vlist1.get_vertex(k)].push_back(k);
          }
          for (auto& j : vec0[i]) {
            auto iter = right_map.find(vlist0.get_vertex(j));
            if (iter != right_map.end()) {
              for (auto& idx : iter->second) {
                shuffle_offsets.push_back(j);
                shuffle_offsets_1.push_back(idx);
              }
            }
          }
        }
      }

      ctxs[0].reshuffle(shuffle_offsets);
      ctxs[1].reshuffle(shuffle_offsets_1);
      ctx.reshuffle(ctxs[0].get_offsets().data());

      for (size_t i = 0; i < ctxs[0].col_num() || i < ctxs[1].col_num(); ++i) {
        if (i >= ctx.col_num() || ctx.get(i) == nullptr) {
          std::shared_ptr<IContextColumn> col(nullptr);
          if (i < ctxs[0].col_num()) {
            if (ctxs[0].get(i) != nullptr) {
              col = ctxs[0].get(i);
            }
          }
          if (col == nullptr && i < ctxs[1].col_num()) {
            if (ctxs[1].get(i) != nullptr) {
              col = ctxs[1].get(i);
            }
          }
          ctx.set(i, col);
        }
      }
      return ctx;
    }
  }
  LOG(ERROR) << "Currently we only support intersect on vertex columns";
  RETURN_NOT_IMPLEMENTED_ERROR(
      "Currently we only support intersect on vertex "
      "columns");
}