cpp-ch/local-engine/Parser/SerializedPlanParser.cpp (360 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "SerializedPlanParser.h" #include <algorithm> #include <memory> #include <string> #include <string_view> #include <AggregateFunctions/AggregateFunctionFactory.h> #include <Columns/ColumnConst.h> #include <Core/Block.h> #include <Core/ColumnWithTypeAndName.h> #include <Core/Names.h> #include <Core/Settings.h> #include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypesDecimal.h> #include <DataTypes/IDataType.h> #include <Functions/FunctionFactory.h> #include <Interpreters/Context.h> #include <Interpreters/ProcessList.h> #include <Interpreters/QueryPriorities.h> #include <Operator/BlocksBufferPoolTransform.h> #include <Parser/ExpressionParser.h> #include <Parser/FunctionParser.h> #include <Parser/LocalExecutor.h> #include <Parser/ParserContext.h> #include <Parser/RelParsers/ReadRelParser.h> #include <Parser/RelParsers/RelParser.h> #include <Parser/RelParsers/WriteRelParser.h> #include <Parser/SubstraitParserUtils.h> #include <Parser/TypeParser.h> #include <Parsers/ExpressionListParsers.h> #include <Processors/Executors/PipelineExecutor.h> #include <Processors/QueryPlan/AggregatingStep.h> #include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/QueryPlan.h> #include <Storages/MergeTree/MergeTreeData.h> #include <Storages/SubstraitSource/SubstraitFileSource.h> #include <Storages/SubstraitSource/SubstraitFileSourceStep.h> #include <google/protobuf/wrappers.pb.h> #include <Common/BlockTypeUtils.h> #include <Common/DebugUtils.h> #include <Common/Exception.h> #include <Common/GlutenConfig.h> #include <Common/PlanUtil.h> #include <Common/logger_useful.h> #include <Common/typeid_cast.h> namespace DB { namespace Setting { extern const SettingsUInt64 priority; extern const SettingsMilliseconds low_priority_query_wait_time_ms; } namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int UNKNOWN_TYPE; extern const int BAD_ARGUMENTS; extern const int NO_SUCH_DATA_PART; extern const int UNKNOWN_FUNCTION; extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int INVALID_JOIN_ON_EXPRESSION; } } namespace local_engine { using namespace DB; std::string join(const ActionsDAG::NodeRawConstPtrs & v, char c) { std::string res; for (size_t i = 0; i < v.size(); ++i) { if (i) res += c; res += v[i]->result_name; } return res; } void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan) { const substrait::PlanRel & root_rel = plan.relations().at(0); if (root_rel.root().names_size()) { auto columns = query_plan->getCurrentHeader().columns(); if (columns != static_cast<size_t>(root_rel.root().names_size())) { debug::dumpPlan(*query_plan, "clickhouse plan", true); debug::dumpMessage(plan, "substrait::Plan", true); throw Exception( ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size. plan column size {}, subtrait plan name size {}.", columns, root_rel.root().names_size()); } PlanUtil::renamePlanHeader( *query_plan, [&root_rel](const Block & input, NamesWithAliases & aliases) { auto output_name = root_rel.root().names().begin(); for (auto input_iter = input.begin(); input_iter != input.end(); ++output_name, ++input_iter) aliases.emplace_back(DB::NameWithAlias(input_iter->name, *output_name)); }); } // fixes: issue-1874, to keep the nullability as expected. const auto & output_schema = root_rel.root().output_schema(); if (output_schema.types_size()) { auto origin_header = query_plan->getCurrentHeader(); const auto & origin_columns = origin_header.getColumnsWithTypeAndName(); if (static_cast<size_t>(output_schema.types_size()) != origin_columns.size()) { debug::dumpPlan(*query_plan, "clickhouse plan", true); debug::dumpMessage(plan, "substrait::Plan", true); throw Exception( ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size. plan column size {}, subtrait plan output schema size {}, subtrait plan name size {}.", origin_columns.size(), output_schema.types_size(), root_rel.root().names_size()); } bool need_final_project = false; ColumnsWithTypeAndName final_columns; for (int i = 0; i < output_schema.types_size(); ++i) { const auto & origin_column = origin_columns[i]; const auto & origin_type = origin_column.type; auto final_type = TypeParser::parseType(output_schema.types(i)); /// Intermediate aggregate data is special, no check here. if (typeid_cast<const DataTypeAggregateFunction *>(origin_column.type.get()) || origin_type->equals(*final_type)) final_columns.push_back(origin_column); else { need_final_project = true; if (origin_column.column && isColumnConst(*origin_column.column)) { /// For const column, we need to cast it individually. Otherwise, the const column will be converted to full column in /// ActionsDAG::makeConvertingActions. /// Note: creating fianl_column with Field of origin_column will cause Exception in some case. const DB::ContextPtr context = DB::CurrentThread::get().getQueryContext(); const FunctionOverloadResolverPtr & cast_resolver = FunctionFactory::instance().get("CAST", context); const DataTypePtr string_type = std::make_shared<DataTypeString>(); ColumnWithTypeAndName to_type_column = {string_type->createColumnConst(1, final_type->getName()), string_type, "__cast_const__"}; FunctionBasePtr cast_function = cast_resolver->build({origin_column, to_type_column}); ColumnPtr const_col = ColumnConst::create(cast_function->execute({origin_column, to_type_column}, final_type, 1, false), 1); ColumnWithTypeAndName final_column(const_col, final_type, origin_column.name); final_columns.emplace_back(std::move(final_column)); } else { ColumnWithTypeAndName final_column(final_type->createColumn(), final_type, origin_column.name); final_columns.emplace_back(std::move(final_column)); } } } if (need_final_project) { ActionsDAG final_project = ActionsDAG::makeConvertingActions(origin_columns, final_columns, ActionsDAG::MatchColumnsMode::Position, true); QueryPlanStepPtr final_project_step = std::make_unique<ExpressionStep>(query_plan->getCurrentHeader(), std::move(final_project)); final_project_step->setStepDescription("Project for output schema"); query_plan->addStep(std::move(final_project_step)); } } } QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan) { debug::dumpMessage(plan, "substrait::Plan"); //parseExtensions(plan.extensions()); if (plan.relations_size() != 1) throw Exception(ErrorCodes::BAD_ARGUMENTS, "too many relations found"); const substrait::PlanRel & root_rel = plan.relations().at(0); if (!root_rel.has_root()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "must have root rel!"); const bool writePipeline = root_rel.root().input().has_write(); const substrait::Rel & first_read_rel = writePipeline ? root_rel.root().input().write().input() : root_rel.root().input(); std::list<const substrait::Rel *> rel_stack; auto query_plan = parseOp(first_read_rel, rel_stack); if (!writePipeline) adjustOutput(query_plan, plan); #ifndef NDEBUG PlanUtil::checkOuputType(*query_plan); #endif debug::dumpPlan(*query_plan); return query_plan; } std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const substrait::Plan & plan) { return createExecutor(parse(plan), plan); } QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack) { auto rel_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(parser_context); auto all_input_rels = rel_parser->getInputs(rel); assert(all_input_rels.empty() || all_input_rels.size() == 1 || all_input_rels.size() == 2); std::vector<DB::QueryPlanPtr> input_query_plans; rel_stack.push_back(&rel); for (const auto * input_rel : all_input_rels) { auto input_query_plan = parseOp(*input_rel, rel_stack); input_query_plans.push_back(std::move(input_query_plan)); } rel_stack.pop_back(); /// Sperical process for read relation because it may be incomplete when reading from scans/mergetrees/ranges. if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead) { chassert(all_input_rels.empty()); auto read_rel_parser = std::dynamic_pointer_cast<ReadRelParser>(rel_parser); const auto & read = rel.read(); if (read_rel_parser->isReadRelFromJavaIter(read)) { /// If read from java iter, local_files is guranteed to be set in read rel. auto iter = read.local_files().items().at(0).uri_file(); auto pos = iter.find(':'); auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); auto [input_iter, materalize_input] = getInputIter(static_cast<size_t>(iter_index)); read_rel_parser->setInputIter(input_iter, materalize_input); } else if (read_rel_parser->isReadRelFromMergeTree(read)) { if (!read.has_extension_table()) read_rel_parser->setSplitInfo(nextSplitInfo()); } else if (read_rel_parser->isReadRelFromRange(read)) { if (!read.has_extension_table()) read_rel_parser->setSplitInfo(nextSplitInfo()); } else if (read_rel_parser->isReadRelFromLocalFile(read)) { if (!read.has_local_files()) read_rel_parser->setSplitInfo(nextSplitInfo()); } else if (read_rel_parser->isReadFromStreamKafka(read)) { read_rel_parser->setSplitInfo(nextSplitInfo()); } } DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); for (auto & extra_plan : rel_parser->extraPlans()) { extra_plan_holder.push_back(std::move(extra_plan)); } std::vector<DB::IQueryPlanStep *> steps = rel_parser->getSteps(); if (const auto & settings = parser_context->queryContext()->getSettingsRef(); settingsEqual(settings, RuntimeSettings::COLLECT_METRICS, "true", {RuntimeSettings::COLLECT_METRICS_DEFAULT})) { if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead) { size_t id = metrics.empty() ? 0 : metrics.back()->getId() + 1; metrics.emplace_back(std::make_shared<RelMetric>(id, String(magic_enum::enum_name(rel.rel_type_case())), steps)); } else metrics = {std::make_shared<RelMetric>(String(magic_enum::enum_name(rel.rel_type_case())), metrics, steps)}; } return query_plan; } DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan) const { const Settings & settings = parser_context->queryContext()->getSettingsRef(); QueryPriorities priorities; const auto query_status = std::make_shared<QueryStatus>( parser_context->queryContext(), "", 0, // since we set a query to empty string, let's set hash to zero. parser_context->queryContext()->getClientInfo(), priorities.insert( settings[Setting::priority], std::chrono::milliseconds(settings[Setting::low_priority_query_wait_time_ms].totalMilliseconds())), CurrentThread::getGroup(), IAST::QueryKind::Select, settings, 0); QueryPlanOptimizationSettings optimization_settings{context}; // TODO: set optimize_plan to true when metrics could be collected while ch query plan optimization is enabled. if (settingsEqual(settings, RuntimeSettings::COLLECT_METRICS, "true", {RuntimeSettings::COLLECT_METRICS_DEFAULT})) optimization_settings.optimize_plan = false; BuildQueryPipelineSettings build_settings = BuildQueryPipelineSettings{context}; build_settings.process_list_element = query_status; build_settings.progress_callback = nullptr; return query_plan.buildQueryPipeline(optimization_settings, build_settings); } std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const std::string_view plan) { const auto s_plan = BinaryToMessage<substrait::Plan>(plan); return createExecutor(parse(s_plan), s_plan); } std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const { Stopwatch stopwatch; DB::QueryPipelineBuilderPtr builder = nullptr; try { builder = buildQueryPipeline(*query_plan); } catch (...) { debug::dumpPlan(*query_plan, "Invalid clickhouse plan", true); throw; } assert(s_plan.relations_size() == 1); const substrait::PlanRel & root_rel = s_plan.relations().at(0); assert(root_rel.has_root()); if (root_rel.root().input().has_write()) addSinkTransform(parser_context->queryContext(), root_rel.root().input().write(), builder); LOG_INFO(getLogger("SerializedPlanParser"), "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0); auto config = ExecutorConfig::loadFromContext(parser_context->queryContext()); return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(builder), config.dump_pipeline); } SerializedPlanParser::SerializedPlanParser(ParserContextPtr parser_context_) : parser_context(parser_context_) { context = parser_context->queryContext(); } NonNullableColumnsResolver::NonNullableColumnsResolver( const Block & header_, ParserContextPtr parser_context_, const substrait::Expression & cond_rel_) : header(header_), parser_context(parser_context_), cond_rel(cond_rel_) { expression_parser = std::make_unique<ExpressionParser>(parser_context); } // make it simple at present if the condition contains or, return empty for both side. std::set<String> NonNullableColumnsResolver::resolve() { collected_columns.clear(); visit(cond_rel); return collected_columns; } // TODO: make it the same as spark, it's too simple at present. void NonNullableColumnsResolver::visit(const substrait::Expression & expr) { if (!expr.has_scalar_function()) return; const auto & scalar_function = expr.scalar_function(); auto function_name = expression_parser->safeGetFunctionName(scalar_function); // Only some special functions are used to judge whether the column is non-nullable. if (function_name == "and") { visit(scalar_function.arguments(0).value()); visit(scalar_function.arguments(1).value()); } else if (function_name == "greaterOrEquals" || function_name == "greater") { // If it's the case, a > x, what ever x or a is, a and x are non-nullable. // a or x may be a column, or a simple expression like plus etc. visitNonNullable(scalar_function.arguments(0).value()); visitNonNullable(scalar_function.arguments(1).value()); } else if (function_name == "lessOrEquals" || function_name == "less") { // same as gt, gte. visitNonNullable(scalar_function.arguments(0).value()); visitNonNullable(scalar_function.arguments(1).value()); } else if (function_name == "isNotNull") { visitNonNullable(scalar_function.arguments(0).value()); } // else do nothing } void NonNullableColumnsResolver::visitNonNullable(const substrait::Expression & expr) { if (expr.has_scalar_function()) { const auto & scalar_function = expr.scalar_function(); auto function_name = expression_parser->safeGetFunctionName(scalar_function); if (function_name == "plus" || function_name == "minus" || function_name == "multiply" || function_name == "divide") { visitNonNullable(scalar_function.arguments(0).value()); visitNonNullable(scalar_function.arguments(1).value()); } } else if (auto field_index = SubstraitParserUtils::getStructFieldIndex(expr)) { const auto & column_pos = *field_index; auto column_name = header.getByPosition(column_pos).name; collected_columns.insert(column_name); } // else, do nothing. } }