cpp-ch/local-engine/Parser/SerializedPlanParser.h (87 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <Core/Block.h> #include <Core/SortDescription.h> #include <DataTypes/DataTypeFactory.h> #include <Interpreters/ActionsDAG.h> #include <Interpreters/Aggregator.h> #include <Parser/CHColumnToSparkRow.h> #include <Parser/RelMetric.h> #include <Processors/QueryPlan/ISourceStep.h> #include <Processors/QueryPlan/QueryPlan.h> #include <Storages/SourceFromJavaIter.h> #include <base/types.h> #include <substrait/plan.pb.h> namespace local_engine { std::string join(const DB::ActionsDAG::NodeRawConstPtrs & v, char c); class SerializedPlanParser; class LocalExecutor; class ParserContext; class ExpressionParser; // Give a condition expression `cond_rel_`, found all columns with nullability that must not containt // null after this filter. // It's used to remove nullability of the columns for performance reason. class NonNullableColumnsResolver { public: explicit NonNullableColumnsResolver( const DB::Block & header_, std::shared_ptr<const ParserContext> parser_context_, const substrait::Expression & cond_rel_); ~NonNullableColumnsResolver() = default; // return column names std::set<String> resolve(); private: DB::Block header; std::shared_ptr<const ParserContext> parser_context; const substrait::Expression & cond_rel; std::unique_ptr<ExpressionParser> expression_parser; std::set<String> collected_columns; void visit(const substrait::Expression & expr); void visitNonNullable(const substrait::Expression & expr); }; class SerializedPlanParser { private: std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const; public: explicit SerializedPlanParser(std::shared_ptr<const ParserContext> parser_context_); /// visible for UT DB::QueryPlanPtr parse(const substrait::Plan & plan); std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan); DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan) const; /// std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan); void addInputIter(jobject iter, bool materialize_input) { input_iters.emplace_back(iter); materialize_inputs.emplace_back(materialize_input); } std::pair<jobject, bool> getInputIter(size_t index) { if (index > input_iters.size()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Index({}) is overflow input_iters's size({})", index, input_iters.size()); return {input_iters[index], materialize_inputs[index]}; } void addSplitInfo(std::string && split_info) { split_infos.emplace_back(std::move(split_info)); } int nextSplitInfoIndex() { if (split_info_index >= split_infos.size()) throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, "split info index out of range, split_info_index: {}, split_infos.size(): {}", split_info_index, split_infos.size()); return split_info_index++; } const String & nextSplitInfo() { auto next_index = nextSplitInfoIndex(); return split_infos.at(next_index); } RelMetricPtr getMetric() { return metrics.empty() ? nullptr : metrics.at(0); } std::vector<DB::QueryPlanPtr> extra_plan_holder; private: DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack); static void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan); std::vector<jobject> input_iters; std::vector<std::string> split_infos; int split_info_index = 0; std::vector<bool> materialize_inputs; DB::ContextPtr context; std::shared_ptr<const ParserContext> parser_context; std::vector<RelMetricPtr> metrics; }; }