in cpp-ch/local-engine/Parser/SerializedPlanParser.cpp [459:548]
QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
{
QueryPlanPtr query_plan;
std::vector<IQueryPlanStep *> steps;
switch (rel.rel_type_case())
{
case substrait::Rel::RelTypeCase::kFetch: {
rel_stack.push_back(&rel);
const auto & limit = rel.fetch();
query_plan = parseOp(limit.input(), rel_stack);
rel_stack.pop_back();
auto limit_step = std::make_unique<LimitStep>(query_plan->getCurrentDataStream(), limit.count(), limit.offset());
limit_step->setStepDescription("LIMIT");
steps.emplace_back(limit_step.get());
query_plan->addStep(std::move(limit_step));
break;
}
case substrait::Rel::RelTypeCase::kRead: {
const auto & read = rel.read();
// TODO: We still maintain the old logic of parsing LocalFiles or ExtensionTable in RealRel
// to be compatiable with some suites about metrics.
// Remove this compatiability in later and then only java iter has local files in ReadRel.
if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read)))
{
assert(rel.has_base_schema());
QueryPlanStepPtr step;
if (isReadRelFromJava(read))
step = parseReadRealWithJavaIter(read);
else
step = parseReadRealWithLocalFile(read);
query_plan = std::make_unique<QueryPlan>();
steps.emplace_back(step.get());
query_plan->addStep(std::move(step));
// Add a buffer after source, it try to preload data from source and reduce the
// waiting time of downstream nodes.
if (context->getSettingsRef().max_threads > 1)
{
auto buffer_step = std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentDataStream());
steps.emplace_back(buffer_step.get());
query_plan->addStep(std::move(buffer_step));
}
}
else
{
substrait::ReadRel::ExtensionTable extension_table;
if (read.has_extension_table())
extension_table = read.extension_table();
else
extension_table = parseExtensionTable(split_infos.at(nextSplitInfoIndex()));
MergeTreeRelParser mergeTreeParser(this, context, query_context, global_context);
std::list<const substrait::Rel *> stack;
query_plan = mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read, extension_table, stack);
steps = mergeTreeParser.getSteps();
}
break;
}
case substrait::Rel::RelTypeCase::kFilter:
case substrait::Rel::RelTypeCase::kGenerate:
case substrait::Rel::RelTypeCase::kProject:
case substrait::Rel::RelTypeCase::kAggregate:
case substrait::Rel::RelTypeCase::kSort:
case substrait::Rel::RelTypeCase::kWindow:
case substrait::Rel::RelTypeCase::kJoin:
case substrait::Rel::RelTypeCase::kExpand: {
auto op_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(this);
query_plan = op_parser->parseOp(rel, rel_stack);
auto parser_steps = op_parser->getSteps();
steps.insert(steps.end(), parser_steps.begin(), parser_steps.end());
break;
}
default:
throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support relation type: {}.\n{}", rel.rel_type_case(), rel.DebugString());
}
if (!context->getSettingsRef().query_plan_enable_optimizations)
{
if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead)
{
size_t id = metrics.empty() ? 0 : metrics.back()->getId() + 1;
metrics.emplace_back(std::make_shared<RelMetric>(id, String(magic_enum::enum_name(rel.rel_type_case())), steps));
}
else
metrics = {std::make_shared<RelMetric>(String(magic_enum::enum_name(rel.rel_type_case())), metrics, steps)};
}
return query_plan;
}