QueryPlanPtr SerializedPlanParser::parseOp()

in cpp-ch/local-engine/Parser/SerializedPlanParser.cpp [459:548]


QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
{
    QueryPlanPtr query_plan;
    std::vector<IQueryPlanStep *> steps;
    switch (rel.rel_type_case())
    {
        case substrait::Rel::RelTypeCase::kFetch: {
            rel_stack.push_back(&rel);
            const auto & limit = rel.fetch();
            query_plan = parseOp(limit.input(), rel_stack);
            rel_stack.pop_back();
            auto limit_step = std::make_unique<LimitStep>(query_plan->getCurrentDataStream(), limit.count(), limit.offset());
            limit_step->setStepDescription("LIMIT");
            steps.emplace_back(limit_step.get());
            query_plan->addStep(std::move(limit_step));
            break;
        }
        case substrait::Rel::RelTypeCase::kRead: {
            const auto & read = rel.read();
            // TODO: We still maintain the old logic of parsing LocalFiles or ExtensionTable in RealRel
            // to be compatiable with some suites about metrics.
            // Remove this compatiability in later and then only java iter has local files in ReadRel.
            if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read)))
            {
                assert(rel.has_base_schema());
                QueryPlanStepPtr step;
                if (isReadRelFromJava(read))
                    step = parseReadRealWithJavaIter(read);
                else
                    step = parseReadRealWithLocalFile(read);

                query_plan = std::make_unique<QueryPlan>();
                steps.emplace_back(step.get());
                query_plan->addStep(std::move(step));

                // Add a buffer after source, it try to preload data from source and reduce the
                // waiting time of downstream nodes.
                if (context->getSettingsRef().max_threads > 1)
                {
                    auto buffer_step = std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentDataStream());
                    steps.emplace_back(buffer_step.get());
                    query_plan->addStep(std::move(buffer_step));
                }
            }
            else
            {
                substrait::ReadRel::ExtensionTable extension_table;
                if (read.has_extension_table())
                    extension_table = read.extension_table();
                else
                    extension_table = parseExtensionTable(split_infos.at(nextSplitInfoIndex()));

                MergeTreeRelParser mergeTreeParser(this, context, query_context, global_context);
                std::list<const substrait::Rel *> stack;
                query_plan = mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read, extension_table, stack);
                steps = mergeTreeParser.getSteps();
            }
            break;
        }
        case substrait::Rel::RelTypeCase::kFilter:
        case substrait::Rel::RelTypeCase::kGenerate:
        case substrait::Rel::RelTypeCase::kProject:
        case substrait::Rel::RelTypeCase::kAggregate:
        case substrait::Rel::RelTypeCase::kSort:
        case substrait::Rel::RelTypeCase::kWindow:
        case substrait::Rel::RelTypeCase::kJoin:
        case substrait::Rel::RelTypeCase::kExpand: {
            auto op_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(this);
            query_plan = op_parser->parseOp(rel, rel_stack);
            auto parser_steps = op_parser->getSteps();
            steps.insert(steps.end(), parser_steps.begin(), parser_steps.end());
            break;
        }
        default:
            throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support relation type: {}.\n{}", rel.rel_type_case(), rel.DebugString());
    }

    if (!context->getSettingsRef().query_plan_enable_optimizations)
    {
        if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead)
        {
            size_t id = metrics.empty() ? 0 : metrics.back()->getId() + 1;
            metrics.emplace_back(std::make_shared<RelMetric>(id, String(magic_enum::enum_name(rel.rel_type_case())), steps));
        }
        else
            metrics = {std::make_shared<RelMetric>(String(magic_enum::enum_name(rel.rel_type_case())), metrics, steps)};
    }

    return query_plan;
}