uint32_t maxDrivers()

in velox/exec/LocalPlanner.cpp [165:240]


uint32_t maxDrivers(const DriverFactory& driverFactory) {
  uint32_t count = maxDriversForConsumer(driverFactory.consumerNode);
  if (count == 1) {
    return count;
  }
  for (auto& node : driverFactory.planNodes) {
    if (auto aggregation =
            std::dynamic_pointer_cast<const core::AggregationNode>(node)) {
      if (aggregation->step() == core::AggregationNode::Step::kFinal ||
          aggregation->step() == core::AggregationNode::Step::kSingle) {
        // final aggregations must run single-threaded
        return 1;
      }
    } else if (
        auto topN = std::dynamic_pointer_cast<const core::TopNNode>(node)) {
      if (!topN->isPartial()) {
        // final topN must run single-threaded
        return 1;
      }
    } else if (
        auto values = std::dynamic_pointer_cast<const core::ValuesNode>(node)) {
      // values node must run single-threaded, unless in test context
      if (!values->isParallelizable()) {
        return 1;
      }
    } else if (
        auto limit = std::dynamic_pointer_cast<const core::LimitNode>(node)) {
      // final limit must run single-threaded
      if (!limit->isPartial()) {
        return 1;
      }
    } else if (
        auto orderBy =
            std::dynamic_pointer_cast<const core::OrderByNode>(node)) {
      // final orderby must run single-threaded
      if (!orderBy->isPartial()) {
        return 1;
      }
    } else if (
        auto localMerge =
            std::dynamic_pointer_cast<const core::LocalMergeNode>(node)) {
      // Local merge must run single-threaded.
      return 1;
    } else if (
        auto mergeExchange =
            std::dynamic_pointer_cast<const core::MergeExchangeNode>(node)) {
      // MergeExchange must run single-threaded.
      return 1;
    } else if (std::dynamic_pointer_cast<const core::MergeJoinNode>(node)) {
      // MergeJoinNode must run single-threaded.
      return 1;
    } else if (
        auto tableWrite =
            std::dynamic_pointer_cast<const core::TableWriteNode>(node)) {
      if (!tableWrite->insertTableHandle()
               ->connectorInsertTableHandle()
               ->supportsMultiThreading()) {
        return 1;
      }
    } else {
      auto result = Operator::maxDrivers(node);
      if (result) {
        VELOX_CHECK_GT(
            *result,
            0,
            "maxDrivers must be greater than 0. Plan node: {}",
            node->toString())
        if (*result == 1) {
          return 1;
        }
        count = std::min(*result, count);
      }
    }
  }
  return count;
}