in be/src/pipeline/pipeline_fragment_context.cpp [1171:1654]
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
const doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_operator) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
bool enable_query_cache = request.fragment.__isset.query_cache_param;
bool fe_with_old_version = false;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
op.reset(new OlapScanOperatorX(
pool, tnode, next_operator_id(), descs, _num_instances,
enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {}));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
#ifndef NDEBUG
DCHECK(_query_ctx != nullptr);
_query_ctx->query_mem_tracker()->is_group_commit_load = true;
#endif
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
if (config::enable_java_support) {
op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
} else {
return Status::InternalError(
"Jdbc scan node is disabled, you can change be config enable_java_support "
"to true and restart be.");
}
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::ES_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0);
DCHECK_GT(num_senders, 0);
op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
if (tnode.agg_node.grouping_exprs.empty() &&
descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
": group by and output is empty");
}
bool need_create_cache_op =
enable_query_cache && tnode.node_id == request.fragment.query_cache_param.node_id;
auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
auto cache_node_id = request.local_params[0].per_node_scan_ranges.begin()->first;
auto cache_source_id = next_operator_id();
op.reset(new CacheSourceOperatorX(pool, cache_node_id, cache_source_id,
request.fragment.query_cache_param));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
new_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(new_pipe->id());
DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
next_sink_operator_id(), cache_source_id, op->operator_id()));
RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
return Status::OK();
};
const bool group_by_limit_opt =
tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
/// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
/// If `group_by_limit_opt` is true, then it might not need to spill at all.
const bool enable_spill = _runtime_state->enable_spill() &&
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
!tnode.agg_node.grouping_exprs.empty();
const bool can_use_distinct_streaming_agg =
is_streaming_agg && tnode.agg_node.aggregate_functions.empty() &&
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation;
if (can_use_distinct_streaming_agg) {
if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_operator(false);
_require_bucket_distribution = true;
RETURN_IF_ERROR(new_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
}
} else if (is_streaming_agg) {
if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
RETURN_IF_ERROR(new_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
cur_pipe = new_pipe;
} else {
op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
}
} else {
// create new pipeline to add query cache operator
PipelinePtr new_pipe;
if (need_create_cache_op) {
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
}
if (enable_spill) {
op.reset(new PartitionedAggSourceOperatorX(pool, tnode, next_operator_id(), descs));
} else {
op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs));
}
if (need_create_cache_op) {
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
RETURN_IF_ERROR(new_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
cur_pipe = new_pipe;
} else {
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
}
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorPtr sink;
if (enable_spill) {
sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(),
op->operator_id(), tnode, descs,
_require_bucket_distribution));
} else {
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
tnode, descs, _require_bucket_distribution));
}
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
}
break;
}
case TPlanNodeType::HASH_JOIN_NODE: {
const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join;
const auto enable_spill = _runtime_state->enable_spill();
if (enable_spill && !is_broadcast_join) {
auto tnode_ = tnode;
tnode_.runtime_filters.clear();
uint32_t partition_count = _runtime_state->spill_hash_join_partition_count();
auto inner_probe_operator =
std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
// probe side inner sink operator is used to build hash table on probe side when data is spilled.
// So here use `tnode_` which has no runtime filters.
auto probe_side_inner_sink_operator =
std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
pool, tnode_, next_operator_id(), descs, partition_count);
probe_operator->set_inner_operators(probe_side_inner_sink_operator,
inner_probe_operator);
op = std::move(probe_operator);
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
auto inner_sink_operator =
std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
pool, next_sink_operator_id(), op->operator_id(), tnode_, descs,
partition_count);
RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
DataSinkOperatorPtr sink = std::move(sink_operator);
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(),
op->operator_id(), tnode, descs));
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
std::shared_ptr<HashJoinSharedState> shared_state =
HashJoinSharedState::create_shared(_num_instances);
for (int i = 0; i < _num_instances; i++) {
auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
"HASH_JOIN_BUILD_DEPENDENCY");
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
}
shared_state->create_source_dependencies(_num_instances, op->operator_id(),
op->node_id(), "HASH_JOIN_PROBE");
_op_id_to_shared_state.insert(
{op->operator_id(), {shared_state, shared_state->sink_deps}});
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(),
op->operator_id(), tnode, descs));
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
}
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
for (int i = 0; i < child_count; i++) {
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), op->operator_id(), pool,
tnode, descs));
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
// preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
}
break;
}
case TPlanNodeType::SORT_NODE: {
const auto should_spill = _runtime_state->enable_spill() &&
tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
const bool use_local_merge =
tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
if (should_spill) {
op.reset(new SpillSortSourceOperatorX(pool, tnode, next_operator_id(), descs));
} else if (use_local_merge) {
op.reset(new LocalMergeSortSourceOperatorX(pool, tnode, next_operator_id(), descs));
} else {
op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), descs));
}
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorPtr sink;
if (should_spill) {
sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
tnode, descs, _require_bucket_distribution));
} else {
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
tnode, descs, _require_bucket_distribution));
}
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
case doris::TPlanNodeType::PARTITION_SORT_NODE: {
op.reset(new PartitionSortSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new PartitionSortSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
case TPlanNodeType::ANALYTIC_EVAL_NODE: {
op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), op->operator_id(),
tnode, descs, _require_bucket_distribution));
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
case TPlanNodeType::EMPTY_SET_NODE: {
op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
case TPlanNodeType::DATA_GEN_SCAN_NODE: {
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
case TPlanNodeType::META_SCAN_NODE: {
op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
case TPlanNodeType::SELECT_NODE: {
op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
break;
}
default:
return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(tnode.node_type));
}
if (request.__isset.parallel_instances && fe_with_old_version) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_serial_operator();
}
return Status::OK();
}