StopReason Driver::runInternal()

in velox/exec/Driver.cpp [244:410]


StopReason Driver::runInternal(
    std::shared_ptr<Driver>& self,
    std::shared_ptr<BlockingState>* blockingState) {
  // Update the next operator's queueTime.
  if (curOpIndex_ < operators_.size()) {
    operators_[curOpIndex_]->stats().addRuntimeStat(
        "queuedWallNanos",
        (getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000);
  }
  // Get 'task_' into a local because this could be unhooked from it on another
  // thread.
  auto task = task_;
  auto stop = !task ? StopReason::kTerminate : task->enter(state_);
  if (stop != StopReason::kNone) {
    if (stop == StopReason::kTerminate) {
      // ctx_ still has a reference to the Task. 'this' is not on
      // thread from the Task's viewpoint, hence no need to call
      // close().
      ctx_->task->setError(std::make_exception_ptr(VeloxRuntimeError(
          __FILE__,
          __LINE__,
          __FUNCTION__,
          "",
          "Cancelled",
          error_source::kErrorSourceRuntime,
          error_code::kInvalidState,
          false)));
    }
    return stop;
  }

  CancelGuard guard(task_.get(), &state_, [&](StopReason reason) {
    // This is run on error or cancel exit.
    if (reason == StopReason::kTerminate) {
      task->setError(std::make_exception_ptr(VeloxRuntimeError(
          __FILE__,
          __LINE__,
          __FUNCTION__,
          "",
          "Cancelled",
          error_source::kErrorSourceRuntime,
          error_code::kInvalidState,
          false)));
    }
    close();
  });

  // Ensure we remove the writer we might have set when we exit this function in
  // any way.
  const auto statWriterGuard =
      folly::makeGuard([]() { setRunTimeStatWriter(nullptr); });

  try {
    int32_t numOperators = operators_.size();
    ContinueFuture future(false);

    for (;;) {
      for (int32_t i = numOperators - 1; i >= 0; --i) {
        stop = task_->shouldStop();
        if (stop != StopReason::kNone) {
          guard.notThrown();
          return stop;
        }

        auto op = operators_[i].get();
        // In case we are blocked, this index will point to the operator, whose
        // queuedTime we should update.
        curOpIndex_ = i;
        // Set up the runtime stats writer with the current operator, whose
        // runtime stats would be updated (for instance time taken to load lazy
        // vectors).
        setRunTimeStatWriter(std::make_unique<OperatorRuntimeStatWriter>(op));

        blockingReason_ = op->isBlocked(&future);
        if (blockingReason_ != BlockingReason::kNotBlocked) {
          *blockingState = std::make_shared<BlockingState>(
              self, std::move(future), op, blockingReason_);
          guard.notThrown();
          return StopReason::kBlock;
        }
        Operator* nextOp = nullptr;
        if (i < operators_.size() - 1) {
          nextOp = operators_[i + 1].get();
          blockingReason_ = nextOp->isBlocked(&future);
          if (blockingReason_ != BlockingReason::kNotBlocked) {
            *blockingState = std::make_shared<BlockingState>(
                self, std::move(future), nextOp, blockingReason_);
            guard.notThrown();
            return StopReason::kBlock;
          }
          if (nextOp->needsInput()) {
            uint64_t resultBytes = 0;
            RowVectorPtr result;
            {
              CpuWallTimer timer(op->stats().getOutputTiming);
              result = op->getOutput();
              if (result) {
                op->stats().outputPositions += result->size();
                resultBytes = result->estimateFlatSize();
                op->stats().outputBytes += resultBytes;
              }
            }
            pushdownFilters(i);
            if (result) {
              CpuWallTimer timer(nextOp->stats().addInputTiming);
              nextOp->stats().inputPositions += result->size();
              nextOp->stats().inputBytes += resultBytes;
              nextOp->addInput(result);
              // The next iteration will see if operators_[i + 1] has
              // output now that it got input.
              i += 2;
              continue;
            } else {
              stop = task_->shouldStop();
              if (stop != StopReason::kNone) {
                guard.notThrown();
                return stop;
              }
              // The op is at end. If this is finishing, propagate the
              // finish to the next op. The op could have run out
              // because it is blocked. If the op is the source and it
              // is not blocked and empty, this is finished. If this is
              // not the source, just try to get output from the one
              // before.
              blockingReason_ = op->isBlocked(&future);
              if (blockingReason_ != BlockingReason::kNotBlocked) {
                *blockingState = std::make_shared<BlockingState>(
                    self, std::move(future), op, blockingReason_);
                guard.notThrown();
                return StopReason::kBlock;
              }
              if (op->isFinished()) {
                CpuWallTimer timer(nextOp->stats().finishTiming);
                nextOp->noMoreInput();
                break;
              }
            }
          }
        } else {
          // A sink (last) operator, after getting unblocked, gets
          // control here so it can advance. If it is again blocked,
          // this will be detected when trying to add input and we
          // will come back here after this is again on thread.
          {
            CpuWallTimer timer(op->stats().getOutputTiming);
            op->getOutput();
          }
          if (op->isFinished()) {
            guard.notThrown();
            close();
            return StopReason::kAtEnd;
          }
          pushdownFilters(i);
          continue;
        }
      }
    }
  } catch (velox::VeloxException& e) {
    task_->setError(std::current_exception());
    // The CancelPoolGuard will close 'self' and remove from task_.
    return StopReason::kAlreadyTerminated;
  } catch (std::exception& e) {
    task_->setError(std::current_exception());
    // The CancelGuard will close 'self' and remove from task_.
    return StopReason::kAlreadyTerminated;
  }
}