in velox/exec/LocalPlanner.cpp [165:240]
uint32_t maxDrivers(const DriverFactory& driverFactory) {
uint32_t count = maxDriversForConsumer(driverFactory.consumerNode);
if (count == 1) {
return count;
}
for (auto& node : driverFactory.planNodes) {
if (auto aggregation =
std::dynamic_pointer_cast<const core::AggregationNode>(node)) {
if (aggregation->step() == core::AggregationNode::Step::kFinal ||
aggregation->step() == core::AggregationNode::Step::kSingle) {
// final aggregations must run single-threaded
return 1;
}
} else if (
auto topN = std::dynamic_pointer_cast<const core::TopNNode>(node)) {
if (!topN->isPartial()) {
// final topN must run single-threaded
return 1;
}
} else if (
auto values = std::dynamic_pointer_cast<const core::ValuesNode>(node)) {
// values node must run single-threaded, unless in test context
if (!values->isParallelizable()) {
return 1;
}
} else if (
auto limit = std::dynamic_pointer_cast<const core::LimitNode>(node)) {
// final limit must run single-threaded
if (!limit->isPartial()) {
return 1;
}
} else if (
auto orderBy =
std::dynamic_pointer_cast<const core::OrderByNode>(node)) {
// final orderby must run single-threaded
if (!orderBy->isPartial()) {
return 1;
}
} else if (
auto localMerge =
std::dynamic_pointer_cast<const core::LocalMergeNode>(node)) {
// Local merge must run single-threaded.
return 1;
} else if (
auto mergeExchange =
std::dynamic_pointer_cast<const core::MergeExchangeNode>(node)) {
// MergeExchange must run single-threaded.
return 1;
} else if (std::dynamic_pointer_cast<const core::MergeJoinNode>(node)) {
// MergeJoinNode must run single-threaded.
return 1;
} else if (
auto tableWrite =
std::dynamic_pointer_cast<const core::TableWriteNode>(node)) {
if (!tableWrite->insertTableHandle()
->connectorInsertTableHandle()
->supportsMultiThreading()) {
return 1;
}
} else {
auto result = Operator::maxDrivers(node);
if (result) {
VELOX_CHECK_GT(
*result,
0,
"maxDrivers must be greater than 0. Plan node: {}",
node->toString())
if (*result == 1) {
return 1;
}
count = std::min(*result, count);
}
}
}
return count;
}