bl::result ScanOprBuilder::Build()

in flex/engines/graph_db/runtime/execute/ops/retrieve/scan.cc [471:637]


bl::result<ReadOpBuildResultT> ScanOprBuilder::Build(
    const gs::Schema& schema, const ContextMeta& ctx_meta,
    const physical::PhysicalPlan& plan, int op_idx) {
  ContextMeta ret_meta;
  int alias = -1;
  if (plan.plan(op_idx).opr().scan().has_alias()) {
    alias = plan.plan(op_idx).opr().scan().alias().value();
  }
  ret_meta.set(alias);
  auto scan_opr = plan.plan(op_idx).opr().scan();
  if (scan_opr.scan_opt() != physical::Scan::VERTEX) {
    LOG(ERROR) << "Currently only support scan vertex";
    return std::make_pair(nullptr, ret_meta);
  }
  if (!scan_opr.has_params()) {
    LOG(ERROR) << "Scan operator should have params";
    return std::make_pair(nullptr, ret_meta);
  }

  ScanParams scan_params;
  scan_params.alias = scan_opr.has_alias() ? scan_opr.alias().value() : -1;
  scan_params.limit = std::numeric_limits<int32_t>::max();
  if (scan_opr.params().has_limit()) {
    auto& limit_range = scan_opr.params().limit();
    if (limit_range.lower() != 0) {
      LOG(FATAL) << "Scan with lower limit expect 0, but got "
                 << limit_range.lower();
    }
    if (limit_range.upper() > 0) {
      scan_params.limit = limit_range.upper();
    }
  }
  for (auto& table : scan_opr.params().tables()) {
    // bug here, exclude invalid vertex label id
    if (schema.vertex_label_num() <= table.id()) {
      continue;
    }
    scan_params.tables.emplace_back(table.id());
  }
  if (scan_opr.has_idx_predicate()) {
    bool scan_oid = false;
    if (!ScanUtils::check_idx_predicate(scan_opr, scan_oid)) {
      LOG(ERROR) << "Index predicate is not supported"
                 << scan_opr.DebugString();
      return std::make_pair(nullptr, ret_meta);
    }
    // only one label and without predicate
    if (scan_params.tables.size() == 1 && scan_oid &&
        (!scan_opr.params().has_predicate())) {
      const auto& pks = schema.get_vertex_primary_key(scan_params.tables[0]);
      const auto& [type, _, __] = pks[0];
      auto oids =
          ScanUtils::parse_ids_with_type(type, scan_opr.idx_predicate());
      return std::make_pair(
          std::make_unique<FilterOidsWithoutPredOpr>(scan_params, oids),
          ret_meta);
    }

    // without predicate
    if (!scan_opr.params().has_predicate()) {
      if (!scan_oid) {
        auto gids = ScanUtils::parse_ids_with_type(PropertyType::kInt64,
                                                   scan_opr.idx_predicate());
        return std::make_pair(
            std::make_unique<FilterGidsWithoutPredOpr>(scan_params, gids),
            ret_meta);
      } else {
        std::vector<std::function<std::vector<Any>(ParamsType)>> oids;
        std::set<int> types;
        for (auto& table : scan_params.tables) {
          const auto& pks = schema.get_vertex_primary_key(table);
          const auto& [type, _, __] = pks[0];
          int type_impl = static_cast<int>(type.type_enum);
          if (types.find(type_impl) == types.end()) {
            types.insert(type_impl);
            const auto& oid =
                ScanUtils::parse_ids_with_type(type, scan_opr.idx_predicate());
            oids.emplace_back(oid);
          }
        }
        if (types.size() == 1) {
          return std::make_pair(
              std::make_unique<FilterOidsWithoutPredOpr>(scan_params, oids[0]),
              ret_meta);
        } else {
          return std::make_pair(
              std::make_unique<FilterMultiTypeOidsWithoutPredOpr>(scan_params,
                                                                  oids),
              ret_meta);
        }
      }
    } else {
      auto sp_vertex_pred =
          parse_special_vertex_predicate(scan_opr.params().predicate());
      if (scan_oid) {
        std::set<int> types;
        std::vector<std::function<std::vector<Any>(ParamsType)>> oids;
        for (auto& table : scan_params.tables) {
          const auto& pks = schema.get_vertex_primary_key(table);
          const auto& [type, _, __] = pks[0];
          auto type_impl = static_cast<int>(type.type_enum);
          if (types.find(type_impl) == types.end()) {
            auto oid =
                ScanUtils::parse_ids_with_type(type, scan_opr.idx_predicate());
            types.insert(type_impl);
            oids.emplace_back(oid);
          }
        }
        if (types.size() == 1) {
          if (sp_vertex_pred.has_value()) {
            return std::make_pair(std::make_unique<FilterOidsSPredOpr>(
                                      scan_params, oids[0], *sp_vertex_pred),
                                  ret_meta);
          } else {
            return std::make_pair(
                std::make_unique<FilterOidsGPredOpr>(
                    scan_params, oids[0], scan_opr.params().predicate()),
                ret_meta);
          }
        } else {
          if (sp_vertex_pred.has_value()) {
            return std::make_pair(std::make_unique<FilterOidsMultiTypeSPredOpr>(
                                      scan_params, oids, *sp_vertex_pred),
                                  ret_meta);
          } else {
            return std::make_pair(
                std::make_unique<FilterOidsMultiTypeGPredOpr>(
                    scan_params, oids, scan_opr.params().predicate()),
                ret_meta);
          }
        }

      } else {
        auto gids = ScanUtils::parse_ids_with_type(PropertyType::kInt64,
                                                   scan_opr.idx_predicate());
        if (sp_vertex_pred.has_value()) {
          return std::make_pair(std::make_unique<FilterGidsSPredOpr>(
                                    scan_params, gids, *sp_vertex_pred),
                                ret_meta);
        } else {
          return std::make_pair(
              std::make_unique<FilterGidsGPredOpr>(
                  scan_params, gids, scan_opr.params().predicate()),
              ret_meta);
        }
      }
    }

  } else {
    if (scan_opr.params().has_predicate()) {
      auto sp_vertex_pred =
          parse_special_vertex_predicate(scan_opr.params().predicate());
      if (sp_vertex_pred.has_value()) {
        return std::make_pair(
            std::make_unique<ScanWithSPredOpr>(scan_params, *sp_vertex_pred),
            ret_meta);
      } else {
        return std::make_pair(std::make_unique<ScanWithGPredOpr>(
                                  scan_params, scan_opr.params().predicate()),
                              ret_meta);
      }
    } else {
      return std::make_pair(std::make_unique<ScanWithoutPredOpr>(scan_params),
                            ret_meta);
    }
  }
}