in velox/exec/LocalPlanner.cpp [274:423]
std::shared_ptr<Driver> DriverFactory::createDriver(
std::unique_ptr<DriverCtx> ctx,
std::shared_ptr<ExchangeClient> exchangeClient,
std::function<int(int pipelineId)> numDrivers) {
std::vector<std::unique_ptr<Operator>> operators;
operators.reserve(planNodes.size());
for (int32_t i = 0; i < planNodes.size(); i++) {
// Id of the Operator being made. This is not the same as 'i'
// because some PlanNodes may get fused.
auto id = operators.size();
auto planNode = planNodes[i];
if (auto filterNode =
std::dynamic_pointer_cast<const core::FilterNode>(planNode)) {
if (i < planNodes.size() - 1) {
auto next = planNodes[i + 1];
if (auto projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(next)) {
operators.push_back(std::make_unique<FilterProject>(
id, ctx.get(), filterNode, projectNode));
i++;
continue;
}
}
operators.push_back(
std::make_unique<FilterProject>(id, ctx.get(), filterNode, nullptr));
} else if (
auto projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(planNode)) {
operators.push_back(
std::make_unique<FilterProject>(id, ctx.get(), nullptr, projectNode));
} else if (
auto valuesNode =
std::dynamic_pointer_cast<const core::ValuesNode>(planNode)) {
operators.push_back(std::make_unique<Values>(id, ctx.get(), valuesNode));
} else if (
auto tableScanNode =
std::dynamic_pointer_cast<const core::TableScanNode>(planNode)) {
operators.push_back(
std::make_unique<TableScan>(id, ctx.get(), tableScanNode));
} else if (
auto tableWriteNode =
std::dynamic_pointer_cast<const core::TableWriteNode>(planNode)) {
operators.push_back(
std::make_unique<TableWriter>(id, ctx.get(), tableWriteNode));
} else if (
auto mergeExchangeNode =
std::dynamic_pointer_cast<const core::MergeExchangeNode>(
planNode)) {
operators.push_back(
std::make_unique<MergeExchange>(i, ctx.get(), mergeExchangeNode));
} else if (
auto exchangeNode =
std::dynamic_pointer_cast<const core::ExchangeNode>(planNode)) {
operators.push_back(std::make_unique<Exchange>(
id, ctx.get(), exchangeNode, exchangeClient));
} else if (
auto partitionedOutputNode =
std::dynamic_pointer_cast<const core::PartitionedOutputNode>(
planNode)) {
operators.push_back(std::make_unique<PartitionedOutput>(
id, ctx.get(), partitionedOutputNode));
} else if (
auto joinNode =
std::dynamic_pointer_cast<const core::HashJoinNode>(planNode)) {
operators.push_back(std::make_unique<HashProbe>(id, ctx.get(), joinNode));
} else if (
auto joinNode =
std::dynamic_pointer_cast<const core::CrossJoinNode>(planNode)) {
operators.push_back(
std::make_unique<CrossJoinProbe>(id, ctx.get(), joinNode));
} else if (
auto aggregationNode =
std::dynamic_pointer_cast<const core::AggregationNode>(planNode)) {
if (!aggregationNode->preGroupedKeys().empty() &&
aggregationNode->preGroupedKeys().size() ==
aggregationNode->groupingKeys().size()) {
operators.push_back(std::make_unique<StreamingAggregation>(
id, ctx.get(), aggregationNode));
} else {
operators.push_back(
std::make_unique<HashAggregation>(id, ctx.get(), aggregationNode));
}
} else if (
auto topNNode =
std::dynamic_pointer_cast<const core::TopNNode>(planNode)) {
operators.push_back(std::make_unique<TopN>(id, ctx.get(), topNNode));
} else if (
auto limitNode =
std::dynamic_pointer_cast<const core::LimitNode>(planNode)) {
operators.push_back(std::make_unique<Limit>(id, ctx.get(), limitNode));
} else if (
auto orderByNode =
std::dynamic_pointer_cast<const core::OrderByNode>(planNode)) {
operators.push_back(
std::make_unique<OrderBy>(id, ctx.get(), orderByNode));
} else if (
auto localMerge =
std::dynamic_pointer_cast<const core::LocalMergeNode>(planNode)) {
auto localMergeOp =
std::make_unique<LocalMerge>(id, ctx.get(), localMerge);
operators.push_back(std::move(localMergeOp));
} else if (
auto mergeJoin =
std::dynamic_pointer_cast<const core::MergeJoinNode>(planNode)) {
auto mergeJoinOp = std::make_unique<MergeJoin>(id, ctx.get(), mergeJoin);
ctx->task->createMergeJoinSource(ctx->splitGroupId, mergeJoin->id());
operators.push_back(std::move(mergeJoinOp));
} else if (
auto localPartitionNode =
std::dynamic_pointer_cast<const core::LocalPartitionNode>(
planNode)) {
operators.push_back(std::make_unique<LocalExchangeSourceOperator>(
id,
ctx.get(),
localPartitionNode->outputType(),
localPartitionNode->id(),
ctx->partitionId));
} else if (
auto unnest =
std::dynamic_pointer_cast<const core::UnnestNode>(planNode)) {
operators.push_back(std::make_unique<Unnest>(id, ctx.get(), unnest));
} else if (
auto enforceSingleRow =
std::dynamic_pointer_cast<const core::EnforceSingleRowNode>(
planNode)) {
operators.push_back(
std::make_unique<EnforceSingleRow>(id, ctx.get(), enforceSingleRow));
} else if (
auto assignUniqueIdNode =
std::dynamic_pointer_cast<const core::AssignUniqueIdNode>(
planNode)) {
operators.push_back(std::make_unique<AssignUniqueId>(
id,
ctx.get(),
assignUniqueIdNode,
assignUniqueIdNode->taskUniqueId(),
assignUniqueIdNode->uniqueIdCounter()));
} else {
auto extended = Operator::fromPlanNode(ctx.get(), id, planNode);
VELOX_CHECK(extended, "Unsupported plan node: {}", planNode->toString());
operators.push_back(std::move(extended));
}
}
if (consumerSupplier) {
operators.push_back(consumerSupplier(operators.size(), ctx.get()));
}
return std::make_shared<Driver>(std::move(ctx), std::move(operators));
}