in flex/engines/graph_db/runtime/execute/ops/retrieve/scan.cc [471:637]
bl::result<ReadOpBuildResultT> ScanOprBuilder::Build(
const gs::Schema& schema, const ContextMeta& ctx_meta,
const physical::PhysicalPlan& plan, int op_idx) {
ContextMeta ret_meta;
int alias = -1;
if (plan.plan(op_idx).opr().scan().has_alias()) {
alias = plan.plan(op_idx).opr().scan().alias().value();
}
ret_meta.set(alias);
auto scan_opr = plan.plan(op_idx).opr().scan();
if (scan_opr.scan_opt() != physical::Scan::VERTEX) {
LOG(ERROR) << "Currently only support scan vertex";
return std::make_pair(nullptr, ret_meta);
}
if (!scan_opr.has_params()) {
LOG(ERROR) << "Scan operator should have params";
return std::make_pair(nullptr, ret_meta);
}
ScanParams scan_params;
scan_params.alias = scan_opr.has_alias() ? scan_opr.alias().value() : -1;
scan_params.limit = std::numeric_limits<int32_t>::max();
if (scan_opr.params().has_limit()) {
auto& limit_range = scan_opr.params().limit();
if (limit_range.lower() != 0) {
LOG(FATAL) << "Scan with lower limit expect 0, but got "
<< limit_range.lower();
}
if (limit_range.upper() > 0) {
scan_params.limit = limit_range.upper();
}
}
for (auto& table : scan_opr.params().tables()) {
// bug here, exclude invalid vertex label id
if (schema.vertex_label_num() <= table.id()) {
continue;
}
scan_params.tables.emplace_back(table.id());
}
if (scan_opr.has_idx_predicate()) {
bool scan_oid = false;
if (!ScanUtils::check_idx_predicate(scan_opr, scan_oid)) {
LOG(ERROR) << "Index predicate is not supported"
<< scan_opr.DebugString();
return std::make_pair(nullptr, ret_meta);
}
// only one label and without predicate
if (scan_params.tables.size() == 1 && scan_oid &&
(!scan_opr.params().has_predicate())) {
const auto& pks = schema.get_vertex_primary_key(scan_params.tables[0]);
const auto& [type, _, __] = pks[0];
auto oids =
ScanUtils::parse_ids_with_type(type, scan_opr.idx_predicate());
return std::make_pair(
std::make_unique<FilterOidsWithoutPredOpr>(scan_params, oids),
ret_meta);
}
// without predicate
if (!scan_opr.params().has_predicate()) {
if (!scan_oid) {
auto gids = ScanUtils::parse_ids_with_type(PropertyType::kInt64,
scan_opr.idx_predicate());
return std::make_pair(
std::make_unique<FilterGidsWithoutPredOpr>(scan_params, gids),
ret_meta);
} else {
std::vector<std::function<std::vector<Any>(ParamsType)>> oids;
std::set<int> types;
for (auto& table : scan_params.tables) {
const auto& pks = schema.get_vertex_primary_key(table);
const auto& [type, _, __] = pks[0];
int type_impl = static_cast<int>(type.type_enum);
if (types.find(type_impl) == types.end()) {
types.insert(type_impl);
const auto& oid =
ScanUtils::parse_ids_with_type(type, scan_opr.idx_predicate());
oids.emplace_back(oid);
}
}
if (types.size() == 1) {
return std::make_pair(
std::make_unique<FilterOidsWithoutPredOpr>(scan_params, oids[0]),
ret_meta);
} else {
return std::make_pair(
std::make_unique<FilterMultiTypeOidsWithoutPredOpr>(scan_params,
oids),
ret_meta);
}
}
} else {
auto sp_vertex_pred =
parse_special_vertex_predicate(scan_opr.params().predicate());
if (scan_oid) {
std::set<int> types;
std::vector<std::function<std::vector<Any>(ParamsType)>> oids;
for (auto& table : scan_params.tables) {
const auto& pks = schema.get_vertex_primary_key(table);
const auto& [type, _, __] = pks[0];
auto type_impl = static_cast<int>(type.type_enum);
if (types.find(type_impl) == types.end()) {
auto oid =
ScanUtils::parse_ids_with_type(type, scan_opr.idx_predicate());
types.insert(type_impl);
oids.emplace_back(oid);
}
}
if (types.size() == 1) {
if (sp_vertex_pred.has_value()) {
return std::make_pair(std::make_unique<FilterOidsSPredOpr>(
scan_params, oids[0], *sp_vertex_pred),
ret_meta);
} else {
return std::make_pair(
std::make_unique<FilterOidsGPredOpr>(
scan_params, oids[0], scan_opr.params().predicate()),
ret_meta);
}
} else {
if (sp_vertex_pred.has_value()) {
return std::make_pair(std::make_unique<FilterOidsMultiTypeSPredOpr>(
scan_params, oids, *sp_vertex_pred),
ret_meta);
} else {
return std::make_pair(
std::make_unique<FilterOidsMultiTypeGPredOpr>(
scan_params, oids, scan_opr.params().predicate()),
ret_meta);
}
}
} else {
auto gids = ScanUtils::parse_ids_with_type(PropertyType::kInt64,
scan_opr.idx_predicate());
if (sp_vertex_pred.has_value()) {
return std::make_pair(std::make_unique<FilterGidsSPredOpr>(
scan_params, gids, *sp_vertex_pred),
ret_meta);
} else {
return std::make_pair(
std::make_unique<FilterGidsGPredOpr>(
scan_params, gids, scan_opr.params().predicate()),
ret_meta);
}
}
}
} else {
if (scan_opr.params().has_predicate()) {
auto sp_vertex_pred =
parse_special_vertex_predicate(scan_opr.params().predicate());
if (sp_vertex_pred.has_value()) {
return std::make_pair(
std::make_unique<ScanWithSPredOpr>(scan_params, *sp_vertex_pred),
ret_meta);
} else {
return std::make_pair(std::make_unique<ScanWithGPredOpr>(
scan_params, scan_opr.params().predicate()),
ret_meta);
}
} else {
return std::make_pair(std::make_unique<ScanWithoutPredOpr>(scan_params),
ret_meta);
}
}
}