in velox/exec/Driver.cpp [244:410]
StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
// Update the next operator's queueTime.
if (curOpIndex_ < operators_.size()) {
operators_[curOpIndex_]->stats().addRuntimeStat(
"queuedWallNanos",
(getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000);
}
// Get 'task_' into a local because this could be unhooked from it on another
// thread.
auto task = task_;
auto stop = !task ? StopReason::kTerminate : task->enter(state_);
if (stop != StopReason::kNone) {
if (stop == StopReason::kTerminate) {
// ctx_ still has a reference to the Task. 'this' is not on
// thread from the Task's viewpoint, hence no need to call
// close().
ctx_->task->setError(std::make_exception_ptr(VeloxRuntimeError(
__FILE__,
__LINE__,
__FUNCTION__,
"",
"Cancelled",
error_source::kErrorSourceRuntime,
error_code::kInvalidState,
false)));
}
return stop;
}
CancelGuard guard(task_.get(), &state_, [&](StopReason reason) {
// This is run on error or cancel exit.
if (reason == StopReason::kTerminate) {
task->setError(std::make_exception_ptr(VeloxRuntimeError(
__FILE__,
__LINE__,
__FUNCTION__,
"",
"Cancelled",
error_source::kErrorSourceRuntime,
error_code::kInvalidState,
false)));
}
close();
});
// Ensure we remove the writer we might have set when we exit this function in
// any way.
const auto statWriterGuard =
folly::makeGuard([]() { setRunTimeStatWriter(nullptr); });
try {
int32_t numOperators = operators_.size();
ContinueFuture future(false);
for (;;) {
for (int32_t i = numOperators - 1; i >= 0; --i) {
stop = task_->shouldStop();
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
}
auto op = operators_[i].get();
// In case we are blocked, this index will point to the operator, whose
// queuedTime we should update.
curOpIndex_ = i;
// Set up the runtime stats writer with the current operator, whose
// runtime stats would be updated (for instance time taken to load lazy
// vectors).
setRunTimeStatWriter(std::make_unique<OperatorRuntimeStatWriter>(op));
blockingReason_ = op->isBlocked(&future);
if (blockingReason_ != BlockingReason::kNotBlocked) {
*blockingState = std::make_shared<BlockingState>(
self, std::move(future), op, blockingReason_);
guard.notThrown();
return StopReason::kBlock;
}
Operator* nextOp = nullptr;
if (i < operators_.size() - 1) {
nextOp = operators_[i + 1].get();
blockingReason_ = nextOp->isBlocked(&future);
if (blockingReason_ != BlockingReason::kNotBlocked) {
*blockingState = std::make_shared<BlockingState>(
self, std::move(future), nextOp, blockingReason_);
guard.notThrown();
return StopReason::kBlock;
}
if (nextOp->needsInput()) {
uint64_t resultBytes = 0;
RowVectorPtr result;
{
CpuWallTimer timer(op->stats().getOutputTiming);
result = op->getOutput();
if (result) {
op->stats().outputPositions += result->size();
resultBytes = result->estimateFlatSize();
op->stats().outputBytes += resultBytes;
}
}
pushdownFilters(i);
if (result) {
CpuWallTimer timer(nextOp->stats().addInputTiming);
nextOp->stats().inputPositions += result->size();
nextOp->stats().inputBytes += resultBytes;
nextOp->addInput(result);
// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
continue;
} else {
stop = task_->shouldStop();
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
}
// The op is at end. If this is finishing, propagate the
// finish to the next op. The op could have run out
// because it is blocked. If the op is the source and it
// is not blocked and empty, this is finished. If this is
// not the source, just try to get output from the one
// before.
blockingReason_ = op->isBlocked(&future);
if (blockingReason_ != BlockingReason::kNotBlocked) {
*blockingState = std::make_shared<BlockingState>(
self, std::move(future), op, blockingReason_);
guard.notThrown();
return StopReason::kBlock;
}
if (op->isFinished()) {
CpuWallTimer timer(nextOp->stats().finishTiming);
nextOp->noMoreInput();
break;
}
}
}
} else {
// A sink (last) operator, after getting unblocked, gets
// control here so it can advance. If it is again blocked,
// this will be detected when trying to add input and we
// will come back here after this is again on thread.
{
CpuWallTimer timer(op->stats().getOutputTiming);
op->getOutput();
}
if (op->isFinished()) {
guard.notThrown();
close();
return StopReason::kAtEnd;
}
pushdownFilters(i);
continue;
}
}
}
} catch (velox::VeloxException& e) {
task_->setError(std::current_exception());
// The CancelPoolGuard will close 'self' and remove from task_.
return StopReason::kAlreadyTerminated;
} catch (std::exception& e) {
task_->setError(std::current_exception());
// The CancelGuard will close 'self' and remove from task_.
return StopReason::kAlreadyTerminated;
}
}