Status PipelineFragmentContext::_create_operator()

in be/src/pipeline/pipeline_fragment_context.cpp [1171:1654]


Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
                                                 const doris::TPipelineFragmentParams& request,
                                                 const DescriptorTbl& descs, OperatorPtr& op,
                                                 PipelinePtr& cur_pipe, int parent_idx,
                                                 int child_idx,
                                                 const bool followed_by_shuffled_operator) {
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
    // Therefore, here we need to use a stack-like structure.
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
    std::stringstream error_msg;
    bool enable_query_cache = request.fragment.__isset.query_cache_param;

    bool fe_with_old_version = false;
    switch (tnode.node_type) {
    case TPlanNodeType::OLAP_SCAN_NODE: {
        op.reset(new OlapScanOperatorX(
                pool, tnode, next_operator_id(), descs, _num_instances,
                enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {}));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
#ifndef NDEBUG
        DCHECK(_query_ctx != nullptr);
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
#endif
        op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case doris::TPlanNodeType::JDBC_SCAN_NODE: {
        if (config::enable_java_support) {
            op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
            RETURN_IF_ERROR(cur_pipe->add_operator(
                    op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        } else {
            return Status::InternalError(
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
                    "to true and restart be.");
        }
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case doris::TPlanNodeType::FILE_SCAN_NODE: {
        op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case TPlanNodeType::ES_SCAN_NODE:
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
        op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case TPlanNodeType::EXCHANGE_NODE: {
        int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0);
        DCHECK_GT(num_senders, 0);
        op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case TPlanNodeType::AGGREGATION_NODE: {
        if (tnode.agg_node.grouping_exprs.empty() &&
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
                                         ": group by and output is empty");
        }
        bool need_create_cache_op =
                enable_query_cache && tnode.node_id == request.fragment.query_cache_param.node_id;
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
            auto cache_node_id = request.local_params[0].per_node_scan_ranges.begin()->first;
            auto cache_source_id = next_operator_id();
            op.reset(new CacheSourceOperatorX(pool, cache_node_id, cache_source_id,
                                              request.fragment.query_cache_param));
            RETURN_IF_ERROR(cur_pipe->add_operator(
                    op, request.__isset.parallel_instances ? request.parallel_instances : 0));

            const auto downstream_pipeline_id = cur_pipe->id();
            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                _dag.insert({downstream_pipeline_id, {}});
            }
            new_pipe = add_pipeline(cur_pipe);
            _dag[downstream_pipeline_id].push_back(new_pipe->id());

            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
                    next_sink_operator_id(), cache_source_id, op->operator_id()));
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
            return Status::OK();
        };
        const bool group_by_limit_opt =
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;

        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
        const bool enable_spill = _runtime_state->enable_spill() &&
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
                                      tnode.agg_node.use_streaming_preaggregation &&
                                      !tnode.agg_node.grouping_exprs.empty();
        const bool can_use_distinct_streaming_agg =
                is_streaming_agg && tnode.agg_node.aggregate_functions.empty() &&
                request.query_options.__isset.enable_distinct_streaming_aggregation &&
                request.query_options.enable_distinct_streaming_aggregation;

        if (can_use_distinct_streaming_agg) {
            if (need_create_cache_op) {
                PipelinePtr new_pipe;
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));

                op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
                                                           _require_bucket_distribution));
                op->set_followed_by_shuffled_operator(false);
                _require_bucket_distribution = true;
                RETURN_IF_ERROR(new_pipe->add_operator(
                        op, request.__isset.parallel_instances ? request.parallel_instances : 0));
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                cur_pipe = new_pipe;
            } else {
                op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
                                                           _require_bucket_distribution));
                op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
                _require_bucket_distribution =
                        _require_bucket_distribution || op->require_data_distribution();
                RETURN_IF_ERROR(cur_pipe->add_operator(
                        op, request.__isset.parallel_instances ? request.parallel_instances : 0));
            }
        } else if (is_streaming_agg) {
            if (need_create_cache_op) {
                PipelinePtr new_pipe;
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));

                op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs));
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                RETURN_IF_ERROR(new_pipe->add_operator(
                        op, request.__isset.parallel_instances ? request.parallel_instances : 0));
                cur_pipe = new_pipe;
            } else {
                op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs));
                RETURN_IF_ERROR(cur_pipe->add_operator(
                        op, request.__isset.parallel_instances ? request.parallel_instances : 0));
            }
        } else {
            // create new pipeline to add query cache operator
            PipelinePtr new_pipe;
            if (need_create_cache_op) {
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
            }

            if (enable_spill) {
                op.reset(new PartitionedAggSourceOperatorX(pool, tnode, next_operator_id(), descs));
            } else {
                op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs));
            }
            if (need_create_cache_op) {
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                RETURN_IF_ERROR(new_pipe->add_operator(
                        op, request.__isset.parallel_instances ? request.parallel_instances : 0));
                cur_pipe = new_pipe;
            } else {
                RETURN_IF_ERROR(cur_pipe->add_operator(
                        op, request.__isset.parallel_instances ? request.parallel_instances : 0));
            }

            const auto downstream_pipeline_id = cur_pipe->id();
            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                _dag.insert({downstream_pipeline_id, {}});
            }
            cur_pipe = add_pipeline(cur_pipe);
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());

            DataSinkOperatorPtr sink;
            if (enable_spill) {
                sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(),
                                                           op->operator_id(), tnode, descs,
                                                           _require_bucket_distribution));
            } else {
                sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
                                                tnode, descs, _require_bucket_distribution));
            }
            sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
            _require_bucket_distribution =
                    _require_bucket_distribution || sink->require_data_distribution();
            RETURN_IF_ERROR(cur_pipe->set_sink(sink));
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
        }
        break;
    }
    case TPlanNodeType::HASH_JOIN_NODE: {
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
                                       tnode.hash_join_node.is_broadcast_join;
        const auto enable_spill = _runtime_state->enable_spill();
        if (enable_spill && !is_broadcast_join) {
            auto tnode_ = tnode;
            tnode_.runtime_filters.clear();
            uint32_t partition_count = _runtime_state->spill_hash_join_partition_count();
            auto inner_probe_operator =
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);

            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
            // So here use `tnode_` which has no runtime filters.
            auto probe_side_inner_sink_operator =
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);

            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));

            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
                    pool, tnode_, next_operator_id(), descs, partition_count);
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
                                                inner_probe_operator);
            op = std::move(probe_operator);
            RETURN_IF_ERROR(cur_pipe->add_operator(
                    op, request.__isset.parallel_instances ? request.parallel_instances : 0));

            const auto downstream_pipeline_id = cur_pipe->id();
            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                _dag.insert({downstream_pipeline_id, {}});
            }
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());

            auto inner_sink_operator =
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs,
                    partition_count);
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));

            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
            DataSinkOperatorPtr sink = std::move(sink_operator);
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));

            _pipeline_parent_map.push(op->node_id(), cur_pipe);
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
            sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
            op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
        } else {
            op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
            RETURN_IF_ERROR(cur_pipe->add_operator(
                    op, request.__isset.parallel_instances ? request.parallel_instances : 0));

            const auto downstream_pipeline_id = cur_pipe->id();
            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                _dag.insert({downstream_pipeline_id, {}});
            }
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());

            DataSinkOperatorPtr sink;
            sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(),
                                                      op->operator_id(), tnode, descs));
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));

            _pipeline_parent_map.push(op->node_id(), cur_pipe);
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
            sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
            op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
        }
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
            std::shared_ptr<HashJoinSharedState> shared_state =
                    HashJoinSharedState::create_shared(_num_instances);
            for (int i = 0; i < _num_instances; i++) {
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
                sink_dep->set_shared_state(shared_state.get());
                shared_state->sink_deps.push_back(sink_dep);
            }
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
                                                     op->node_id(), "HASH_JOIN_PROBE");
            _op_id_to_shared_state.insert(
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
        }
        _require_bucket_distribution =
                _require_bucket_distribution || op->require_data_distribution();
        break;
    }
    case TPlanNodeType::CROSS_JOIN_NODE: {
        op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));

        const auto downstream_pipeline_id = cur_pipe->id();
        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
            _dag.insert({downstream_pipeline_id, {}});
        }
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());

        DataSinkOperatorPtr sink;
        sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(),
                                                        op->operator_id(), tnode, descs));
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
        break;
    }
    case TPlanNodeType::UNION_NODE: {
        int child_count = tnode.num_children;
        op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));

        const auto downstream_pipeline_id = cur_pipe->id();
        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
            _dag.insert({downstream_pipeline_id, {}});
        }
        for (int i = 0; i < child_count; i++) {
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
            DataSinkOperatorPtr sink;
            sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), op->operator_id(), pool,
                                              tnode, descs));
            sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
        }
        break;
    }
    case TPlanNodeType::SORT_NODE: {
        const auto should_spill = _runtime_state->enable_spill() &&
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
        const bool use_local_merge =
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
        if (should_spill) {
            op.reset(new SpillSortSourceOperatorX(pool, tnode, next_operator_id(), descs));
        } else if (use_local_merge) {
            op.reset(new LocalMergeSortSourceOperatorX(pool, tnode, next_operator_id(), descs));
        } else {
            op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), descs));
        }
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));

        const auto downstream_pipeline_id = cur_pipe->id();
        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
            _dag.insert({downstream_pipeline_id, {}});
        }
        cur_pipe = add_pipeline(cur_pipe);
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());

        DataSinkOperatorPtr sink;
        if (should_spill) {
            sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
                                                  tnode, descs, _require_bucket_distribution));
        } else {
            sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
                                             tnode, descs, _require_bucket_distribution));
        }
        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
        _require_bucket_distribution =
                _require_bucket_distribution || sink->require_data_distribution();
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
        break;
    }
    case doris::TPlanNodeType::PARTITION_SORT_NODE: {
        op.reset(new PartitionSortSourceOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));

        const auto downstream_pipeline_id = cur_pipe->id();
        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
            _dag.insert({downstream_pipeline_id, {}});
        }
        cur_pipe = add_pipeline(cur_pipe);
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());

        DataSinkOperatorPtr sink;
        sink.reset(new PartitionSortSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
                                                  tnode, descs));
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
        break;
    }
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
        op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));

        const auto downstream_pipeline_id = cur_pipe->id();
        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
            _dag.insert({downstream_pipeline_id, {}});
        }
        cur_pipe = add_pipeline(cur_pipe);
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());

        DataSinkOperatorPtr sink;
        sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
                                             tnode, descs, _require_bucket_distribution));
        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
        _require_bucket_distribution =
                _require_bucket_distribution || sink->require_data_distribution();
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
        break;
    }
    case TPlanNodeType::INTERSECT_NODE: {
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request));
        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
        break;
    }
    case TPlanNodeType::EXCEPT_NODE: {
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request));
        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
        break;
    }
    case TPlanNodeType::REPEAT_NODE: {
        op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
        op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
        op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    case TPlanNodeType::EMPTY_SET_NODE: {
        op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
        op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        fe_with_old_version = !tnode.__isset.is_serial_operator;
        break;
    }
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
        op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    case TPlanNodeType::META_SCAN_NODE: {
        op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    case TPlanNodeType::SELECT_NODE: {
        op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs));
        RETURN_IF_ERROR(cur_pipe->add_operator(
                op, request.__isset.parallel_instances ? request.parallel_instances : 0));
        break;
    }
    default:
        return Status::InternalError("Unsupported exec type in pipeline: {}",
                                     print_plan_node_type(tnode.node_type));
    }
    if (request.__isset.parallel_instances && fe_with_old_version) {
        cur_pipe->set_num_tasks(request.parallel_instances);
        op->set_serial_operator();
    }

    return Status::OK();
}