void Task::start()

in velox/exec/Task.cpp [136:274]


void Task::start(
    std::shared_ptr<Task> self,
    uint32_t maxDrivers,
    uint32_t concurrentSplitGroups) {
  VELOX_CHECK_GE(
      maxDrivers, 1, "maxDrivers parameter must be greater then or equal to 1");
  VELOX_CHECK_GE(
      concurrentSplitGroups,
      1,
      "concurrentSplitGroups parameter must be greater then or equal to 1");
  VELOX_CHECK(self->drivers_.empty());
  self->concurrentSplitGroups_ = concurrentSplitGroups;
  {
    std::lock_guard<std::mutex> l(self->mutex_);
    self->taskStats_.executionStartTimeMs = getCurrentTimeMs();
  }

#if CODEGEN_ENABLED == 1
  const auto& config = self->queryCtx()->config();
  if (config.codegenEnabled() &&
      config.codegenConfigurationFilePath().length() != 0) {
    auto codegenLogger =
        std::make_shared<codegen::DefaultLogger>(self->taskId_);
    auto codegen = codegen::Codegen(codegenLogger);
    auto lazyLoading = config.codegenLazyLoading();
    codegen.initializeFromFile(
        config.codegenConfigurationFilePath(), lazyLoading);
    if (auto newPlanNode = codegen.compile(*(self->planFragment_.planNode))) {
      self->planFragment_.planNode = newPlanNode;
    }
  }
#endif

  // Here we create driver factories.
  LocalPlanner::plan(
      self->planFragment_,
      self->consumerSupplier(),
      &self->driverFactories_,
      maxDrivers);

  // Keep one exchange client per pipeline (NULL if not used).
  const auto numPipelines = self->driverFactories_.size();
  self->exchangeClients_.resize(numPipelines);

  // For ungrouped execution we reuse some structures used for grouped
  // execution and assume we have "1 split".
  const uint32_t numSplitGroups =
      std::max(1, self->planFragment_.numSplitGroups);

  // For each pipeline we have a corresponding driver factory.
  // Here we count how many drivers in total we need and create
  // pipeline stats.
  for (auto& factory : self->driverFactories_) {
    self->numDriversPerSplitGroup_ += factory->numDrivers;
    self->numTotalDrivers_ += factory->numTotalDrivers;
    self->taskStats_.pipelineStats.emplace_back(
        factory->inputDriver, factory->outputDriver);
  }

  // Register self for possible memory recovery callback. Do this
  // after sizing 'drivers_' but before starting the
  // Drivers. 'drivers_' can be read by memory recovery or
  // cancellation while Drivers are being made, so the array should
  // have final size from the start.

  auto bufferManager = self->bufferManager_.lock();
  VELOX_CHECK_NOT_NULL(
      bufferManager,
      "Unable to initialize task. "
      "PartitionedOutputBufferManager was already destructed");

  // In this loop we prepare the global state of pipelines: partitioned output
  // buffer and exchange client(s).
  for (auto pipeline = 0; pipeline < numPipelines; ++pipeline) {
    auto& factory = self->driverFactories_[pipeline];

    auto partitionedOutputNode = factory->needsPartitionedOutput();
    if (partitionedOutputNode) {
      self->numDriversInPartitionedOutput_ = factory->numDrivers;
      VELOX_CHECK(
          !self->hasPartitionedOutput_,
          "Only one output pipeline per task is supported");
      self->hasPartitionedOutput_ = true;
      bufferManager->initializeTask(
          self,
          partitionedOutputNode->isBroadcast(),
          partitionedOutputNode->numPartitions(),
          self->numDriversInPartitionedOutput_ * numSplitGroups);
    }

    if (factory->needsExchangeClient()) {
      // Low-water mark for filling the exchange queue is 1/2 of the per worker
      // buffer size of the producers.
      self->exchangeClients_[pipeline] = std::make_shared<ExchangeClient>(
          self->destination_,
          self->queryCtx()->config().maxPartitionedOutputBufferSize() / 2);
    }
  }

  std::unique_lock<std::mutex> l(self->mutex_);

  // For grouped execution we postpone driver creation up until the splits start
  // arriving, as we don't know what split groups we are going to get.
  // Here we create Drivers only for ungrouped (normal) execution.
  if (self->isUngroupedExecution()) {
    // Create the drivers we are going to run for this task.
    std::vector<std::shared_ptr<Driver>> drivers;
    drivers.reserve(self->numDriversPerSplitGroup_);
    self->createSplitGroupStateLocked(self, 0);
    self->createDriversLocked(self, 0, drivers);

    // Set and start all Drivers together inside 'mutex_' so that cancellations
    // and pauses have well defined timing. For example, do not pause and
    // restart a task while it is still adding Drivers.
    // If the given executor is folly::InlineLikeExecutor (or it's child), since
    // the drivers will be executed synchronously on the same thread as the
    // current task, so we need release the lock to avoid the deadlock.
    self->drivers_ = std::move(drivers);
    if (dynamic_cast<const folly::InlineLikeExecutor*>(
            self->queryCtx()->executor())) {
      l.unlock();
    }
    for (auto& driver : self->drivers_) {
      if (driver) {
        ++self->numRunningDrivers_;
        Driver::enqueue(driver);
      }
    }
  } else {
    // Preallocate a bunch of slots for max concurrent drivers during grouped
    // execution.
    self->drivers_.resize(
        self->numDriversPerSplitGroup_ * self->concurrentSplitGroups_);

    // As some splits could have been added before the task start, ensure we
    // start running drivers for them.
    self->ensureSplitGroupsAreBeingProcessedLocked(self);
  }
}