in velox/exec/Task.cpp [136:274]
void Task::start(
std::shared_ptr<Task> self,
uint32_t maxDrivers,
uint32_t concurrentSplitGroups) {
VELOX_CHECK_GE(
maxDrivers, 1, "maxDrivers parameter must be greater then or equal to 1");
VELOX_CHECK_GE(
concurrentSplitGroups,
1,
"concurrentSplitGroups parameter must be greater then or equal to 1");
VELOX_CHECK(self->drivers_.empty());
self->concurrentSplitGroups_ = concurrentSplitGroups;
{
std::lock_guard<std::mutex> l(self->mutex_);
self->taskStats_.executionStartTimeMs = getCurrentTimeMs();
}
#if CODEGEN_ENABLED == 1
const auto& config = self->queryCtx()->config();
if (config.codegenEnabled() &&
config.codegenConfigurationFilePath().length() != 0) {
auto codegenLogger =
std::make_shared<codegen::DefaultLogger>(self->taskId_);
auto codegen = codegen::Codegen(codegenLogger);
auto lazyLoading = config.codegenLazyLoading();
codegen.initializeFromFile(
config.codegenConfigurationFilePath(), lazyLoading);
if (auto newPlanNode = codegen.compile(*(self->planFragment_.planNode))) {
self->planFragment_.planNode = newPlanNode;
}
}
#endif
// Here we create driver factories.
LocalPlanner::plan(
self->planFragment_,
self->consumerSupplier(),
&self->driverFactories_,
maxDrivers);
// Keep one exchange client per pipeline (NULL if not used).
const auto numPipelines = self->driverFactories_.size();
self->exchangeClients_.resize(numPipelines);
// For ungrouped execution we reuse some structures used for grouped
// execution and assume we have "1 split".
const uint32_t numSplitGroups =
std::max(1, self->planFragment_.numSplitGroups);
// For each pipeline we have a corresponding driver factory.
// Here we count how many drivers in total we need and create
// pipeline stats.
for (auto& factory : self->driverFactories_) {
self->numDriversPerSplitGroup_ += factory->numDrivers;
self->numTotalDrivers_ += factory->numTotalDrivers;
self->taskStats_.pipelineStats.emplace_back(
factory->inputDriver, factory->outputDriver);
}
// Register self for possible memory recovery callback. Do this
// after sizing 'drivers_' but before starting the
// Drivers. 'drivers_' can be read by memory recovery or
// cancellation while Drivers are being made, so the array should
// have final size from the start.
auto bufferManager = self->bufferManager_.lock();
VELOX_CHECK_NOT_NULL(
bufferManager,
"Unable to initialize task. "
"PartitionedOutputBufferManager was already destructed");
// In this loop we prepare the global state of pipelines: partitioned output
// buffer and exchange client(s).
for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) {
auto& factory = self->driverFactories_[pipeline];
auto partitionedOutputNode = factory->needsPartitionedOutput();
if (partitionedOutputNode) {
self->numDriversInPartitionedOutput_ = factory->numDrivers;
VELOX_CHECK(
!self->hasPartitionedOutput_,
"Only one output pipeline per task is supported");
self->hasPartitionedOutput_ = true;
bufferManager->initializeTask(
self,
partitionedOutputNode->isBroadcast(),
partitionedOutputNode->numPartitions(),
self->numDriversInPartitionedOutput_ * numSplitGroups);
}
if (factory->needsExchangeClient()) {
// Low-water mark for filling the exchange queue is 1/2 of the per worker
// buffer size of the producers.
self->exchangeClients_[pipeline] = std::make_shared<ExchangeClient>(
self->destination_,
self->queryCtx()->config().maxPartitionedOutputBufferSize() / 2);
}
}
std::unique_lock<std::mutex> l(self->mutex_);
// For grouped execution we postpone driver creation up until the splits start
// arriving, as we don't know what split groups we are going to get.
// Here we create Drivers only for ungrouped (normal) execution.
if (self->isUngroupedExecution()) {
// Create the drivers we are going to run for this task.
std::vector<std::shared_ptr<Driver>> drivers;
drivers.reserve(self->numDriversPerSplitGroup_);
self->createSplitGroupStateLocked(self, 0);
self->createDriversLocked(self, 0, drivers);
// Set and start all Drivers together inside 'mutex_' so that cancellations
// and pauses have well defined timing. For example, do not pause and
// restart a task while it is still adding Drivers.
// If the given executor is folly::InlineLikeExecutor (or it's child), since
// the drivers will be executed synchronously on the same thread as the
// current task, so we need release the lock to avoid the deadlock.
self->drivers_ = std::move(drivers);
if (dynamic_cast<const folly::InlineLikeExecutor*>(
self->queryCtx()->executor())) {
l.unlock();
}
for (auto& driver : self->drivers_) {
if (driver) {
++self->numRunningDrivers_;
Driver::enqueue(driver);
}
}
} else {
// Preallocate a bunch of slots for max concurrent drivers during grouped
// execution.
self->drivers_.resize(
self->numDriversPerSplitGroup_ * self->concurrentSplitGroups_);
// As some splits could have been added before the task start, ensure we
// start running drivers for them.
self->ensureSplitGroupsAreBeingProcessedLocked(self);
}
}