velox/exec/Driver.cpp (491 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/ScopeGuard.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
#include <folly/executors/thread_factory/InitThreadFactory.h>
#include <gflags/gflags.h>
#include "velox/common/time/Timer.h"
#include "velox/exec/Operator.h"
#include "velox/exec/Task.h"
namespace facebook::velox::exec {
DriverCtx::DriverCtx(
std::shared_ptr<Task> _task,
int _driverId,
int _pipelineId,
uint32_t _splitGroupId,
uint32_t _partitionId)
: driverId(_driverId),
pipelineId(_pipelineId),
splitGroupId(_splitGroupId),
partitionId(_partitionId),
task(_task),
pool(task->addDriverPool()) {}
const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->config();
}
velox::memory::MemoryPool* FOLLY_NONNULL DriverCtx::addOperatorPool() {
return task->addOperatorPool(pool);
}
BlockingState::BlockingState(
std::shared_ptr<Driver> driver,
ContinueFuture&& future,
Operator* op,
BlockingReason reason)
: driver_(std::move(driver)),
future_(std::move(future)),
operator_(op),
reason_(reason),
sinceMicros_(
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count()) {
// Set before leaving the thread.
driver_->state().hasBlockingFuture = true;
}
// static
void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
VELOX_CHECK(!state->driver_->isOnThread());
auto& exec = folly::QueuedImmediateExecutor::instance();
std::move(state->future_)
.via(&exec)
.thenValue([state](bool /* unused */) {
state->operator_->recordBlockingTime(state->sinceMicros_);
auto driver = state->driver_;
auto task = driver->task();
if (!task) {
//'driver' is already removed from its task. No Just drop remaining
// references.
return;
}
{
std::lock_guard<std::mutex> l(task->mutex());
VELOX_CHECK(!driver->state().isSuspended);
VELOX_CHECK(driver->state().hasBlockingFuture);
driver->state().hasBlockingFuture = false;
if (task->pauseRequested()) {
// The thread will be enqueued at resume.
return;
}
Driver::enqueue(state->driver_);
}
})
.thenError(
folly::tag_t<std::exception>{}, [state](std::exception const& e) {
LOG(ERROR)
<< "A ContinueFuture should not be realized with an error"
<< e.what();
state->driver_->setError(std::make_exception_ptr(e));
});
}
namespace {
// Ensures that the thread is removed from its Task's thread count on exit.
class CancelGuard {
public:
CancelGuard(
Task* task,
ThreadState* state,
std::function<void(StopReason)> onTerminate)
: task_(task), state_(state), onTerminate_(onTerminate) {}
void notThrown() {
isThrow_ = false;
}
~CancelGuard() {
bool onTerminateCalled = false;
if (isThrow_) {
// Runtime error. Driver is on thread, hence safe.
state_->isTerminated = true;
onTerminate_(StopReason::kNone);
onTerminateCalled = true;
}
auto reason = task_->leave(*state_);
if (reason == StopReason::kTerminate) {
// Terminate requested via Task. The Driver is not on
// thread but 'terminated_' is set, hence no other threads will
// enter. onTerminateCalled will be true if both runtime error and
// terminate requested via Task.
if (!onTerminateCalled) {
onTerminate_(reason);
}
}
}
private:
Task* task_;
ThreadState* state_;
std::function<void(StopReason reason)> onTerminate_;
bool isThrow_ = true;
};
} // namespace
Driver::~Driver() {
if (task_) {
LOG(ERROR) << "Driver destructed while still in Task: "
<< task_->toString();
DLOG(FATAL) << "Driver destructed while referencing task";
}
}
// static
void Driver::enqueue(std::shared_ptr<Driver> driver) {
// This is expected to be called inside the Driver's Tasks's mutex.
driver->enqueueInternal();
auto& task = driver->task_;
if (!task) {
return;
}
task->queryCtx()->executor()->add([driver]() { Driver::run(driver); });
}
Driver::Driver(
std::unique_ptr<DriverCtx> ctx,
std::vector<std::unique_ptr<Operator>>&& operators)
: ctx_(std::move(ctx)),
task_(ctx_->task),
operators_(std::move(operators)) {
curOpIndex_ = operators_.size() - 1;
// Operators need access to their Driver for adaptation.
ctx_->driver = this;
}
namespace {
/// Checks if output channel is produced using identity projection and returns
/// input channel if so.
std::optional<ChannelIndex> getIdentityProjection(
const std::vector<IdentityProjection>& projections,
ChannelIndex outputChannel) {
for (const auto& projection : projections) {
if (projection.outputChannel == outputChannel) {
return projection.inputChannel;
}
}
return std::nullopt;
}
} // namespace
void Driver::pushdownFilters(int operatorIndex) {
auto op = operators_[operatorIndex].get();
const auto& filters = op->getDynamicFilters();
if (filters.empty()) {
return;
}
op->stats().addRuntimeStat("dynamicFiltersProduced", filters.size());
// Walk operator list upstream and find a place to install the filters.
for (const auto& entry : filters) {
auto channel = entry.first;
for (auto i = operatorIndex - 1; i >= 0; --i) {
auto prevOp = operators_[i].get();
if (i == 0) {
// Source operator.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}
const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}
// Continue walking upstream.
channel = inputChannel.value();
}
}
op->clearDynamicFilters();
}
void Driver::enqueueInternal() {
VELOX_CHECK(!state_.isEnqueued);
state_.isEnqueued = true;
// When enqueuing, starting timing the queue time.
queueTimeStartMicros_ = getCurrentTimeMicro();
}
StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
// Update the next operator's queueTime.
if (curOpIndex_ < operators_.size()) {
operators_[curOpIndex_]->stats().addRuntimeStat(
"queuedWallNanos",
(getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000);
}
// Get 'task_' into a local because this could be unhooked from it on another
// thread.
auto task = task_;
auto stop = !task ? StopReason::kTerminate : task->enter(state_);
if (stop != StopReason::kNone) {
if (stop == StopReason::kTerminate) {
// ctx_ still has a reference to the Task. 'this' is not on
// thread from the Task's viewpoint, hence no need to call
// close().
ctx_->task->setError(std::make_exception_ptr(VeloxRuntimeError(
__FILE__,
__LINE__,
__FUNCTION__,
"",
"Cancelled",
error_source::kErrorSourceRuntime,
error_code::kInvalidState,
false)));
}
return stop;
}
CancelGuard guard(task_.get(), &state_, [&](StopReason reason) {
// This is run on error or cancel exit.
if (reason == StopReason::kTerminate) {
task->setError(std::make_exception_ptr(VeloxRuntimeError(
__FILE__,
__LINE__,
__FUNCTION__,
"",
"Cancelled",
error_source::kErrorSourceRuntime,
error_code::kInvalidState,
false)));
}
close();
});
// Ensure we remove the writer we might have set when we exit this function in
// any way.
const auto statWriterGuard =
folly::makeGuard([]() { setRunTimeStatWriter(nullptr); });
try {
int32_t numOperators = operators_.size();
ContinueFuture future(false);
for (;;) {
for (int32_t i = numOperators - 1; i >= 0; --i) {
stop = task_->shouldStop();
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
}
auto op = operators_[i].get();
// In case we are blocked, this index will point to the operator, whose
// queuedTime we should update.
curOpIndex_ = i;
// Set up the runtime stats writer with the current operator, whose
// runtime stats would be updated (for instance time taken to load lazy
// vectors).
setRunTimeStatWriter(std::make_unique<OperatorRuntimeStatWriter>(op));
blockingReason_ = op->isBlocked(&future);
if (blockingReason_ != BlockingReason::kNotBlocked) {
*blockingState = std::make_shared<BlockingState>(
self, std::move(future), op, blockingReason_);
guard.notThrown();
return StopReason::kBlock;
}
Operator* nextOp = nullptr;
if (i < operators_.size() - 1) {
nextOp = operators_[i + 1].get();
blockingReason_ = nextOp->isBlocked(&future);
if (blockingReason_ != BlockingReason::kNotBlocked) {
*blockingState = std::make_shared<BlockingState>(
self, std::move(future), nextOp, blockingReason_);
guard.notThrown();
return StopReason::kBlock;
}
if (nextOp->needsInput()) {
uint64_t resultBytes = 0;
RowVectorPtr result;
{
CpuWallTimer timer(op->stats().getOutputTiming);
result = op->getOutput();
if (result) {
op->stats().outputPositions += result->size();
resultBytes = result->estimateFlatSize();
op->stats().outputBytes += resultBytes;
}
}
pushdownFilters(i);
if (result) {
CpuWallTimer timer(nextOp->stats().addInputTiming);
nextOp->stats().inputPositions += result->size();
nextOp->stats().inputBytes += resultBytes;
nextOp->addInput(result);
// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
continue;
} else {
stop = task_->shouldStop();
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
}
// The op is at end. If this is finishing, propagate the
// finish to the next op. The op could have run out
// because it is blocked. If the op is the source and it
// is not blocked and empty, this is finished. If this is
// not the source, just try to get output from the one
// before.
blockingReason_ = op->isBlocked(&future);
if (blockingReason_ != BlockingReason::kNotBlocked) {
*blockingState = std::make_shared<BlockingState>(
self, std::move(future), op, blockingReason_);
guard.notThrown();
return StopReason::kBlock;
}
if (op->isFinished()) {
CpuWallTimer timer(nextOp->stats().finishTiming);
nextOp->noMoreInput();
break;
}
}
}
} else {
// A sink (last) operator, after getting unblocked, gets
// control here so it can advance. If it is again blocked,
// this will be detected when trying to add input and we
// will come back here after this is again on thread.
{
CpuWallTimer timer(op->stats().getOutputTiming);
op->getOutput();
}
if (op->isFinished()) {
guard.notThrown();
close();
return StopReason::kAtEnd;
}
pushdownFilters(i);
continue;
}
}
}
} catch (velox::VeloxException& e) {
task_->setError(std::current_exception());
// The CancelPoolGuard will close 'self' and remove from task_.
return StopReason::kAlreadyTerminated;
} catch (std::exception& e) {
task_->setError(std::current_exception());
// The CancelGuard will close 'self' and remove from task_.
return StopReason::kAlreadyTerminated;
}
}
// static
void Driver::run(std::shared_ptr<Driver> self) {
std::shared_ptr<BlockingState> blockingState;
auto reason = self->runInternal(self, &blockingState);
switch (reason) {
case StopReason::kBlock:
// Set the resume action outside of the Task so that, if the
// future is already realized we do not have a second thread
// entering the same Driver.
BlockingState::setResume(blockingState);
return;
case StopReason::kYield:
// Go to the end of the queue.
enqueue(self);
return;
case StopReason::kPause:
case StopReason::kTerminate:
case StopReason::kAlreadyTerminated:
case StopReason::kAtEnd:
return;
default:
VELOX_CHECK(false, "Unhandled stop reason");
}
}
void Driver::initializeOperatorStats(std::vector<OperatorStats>& stats) {
stats.resize(operators_.size(), OperatorStats(0, 0, "", ""));
// initialize the place in stats given by the operatorId. Use the
// operatorId instead of i as the index to document the usage. The
// operators are sequentially numbered but they could be reordered
// in the pipeline later, so the ordinal position of the Operator is
// not always the index into the stats.
for (auto& op : operators_) {
auto id = op->stats().operatorId;
assert(id < stats.size());
stats[id] = op->stats();
}
}
void Driver::addStatsToTask() {
for (auto& op : operators_) {
auto& stats = op->stats();
stats.memoryStats.update(op->pool()->getMemoryUsageTracker());
task_->addOperatorStats(stats);
}
}
void Driver::close() {
if (!task_) {
// Already closed.
return;
}
if (!isOnThread() && !isTerminated()) {
LOG(FATAL) << "Driver::close is only allowed from the Driver's thread";
}
addStatsToTask();
for (auto& op : operators_) {
op->close();
}
auto task = std::move(task_);
Task::removeDriver(task, this);
}
void Driver::closeByTask() {
VELOX_CHECK(isTerminated());
addStatsToTask();
for (auto& op : operators_) {
op->close();
}
task_ = nullptr;
}
void Driver::disconnectFromTask() {
task_ = nullptr;
}
bool Driver::mayPushdownAggregation(Operator* aggregation) const {
for (auto i = 1; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (aggregation == op) {
return true;
}
if (!op->isFilter() || !op->preservesOrder()) {
return false;
}
}
VELOX_FAIL(
"Aggregation operator not found in its Driver: {}",
aggregation->toString());
}
std::unordered_set<ChannelIndex> Driver::canPushdownFilters(
Operator* FOLLY_NONNULL filterSource,
const std::vector<ChannelIndex>& channels) const {
int filterSourceIndex = -1;
for (auto i = 0; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (filterSource == op) {
filterSourceIndex = i;
break;
}
}
VELOX_CHECK_GE(
filterSourceIndex,
0,
"Operator not found in its Driver: {}",
filterSource->toString());
std::unordered_set<ChannelIndex> supportedChannels;
for (auto i = 0; i < channels.size(); ++i) {
auto channel = channels[i];
for (auto j = filterSourceIndex - 1; j >= 0; --j) {
auto prevOp = operators_[j].get();
if (j == 0) {
// Source operator.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}
const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}
// Continue walking upstream.
channel = inputChannel.value();
}
}
return supportedChannels;
}
Operator* FOLLY_NULLABLE
Driver::findOperator(std::string_view planNodeId) const {
for (auto& op : operators_) {
if (op->planNodeId() == planNodeId) {
return op.get();
}
}
return nullptr;
}
void Driver::setError(std::exception_ptr exception) {
ctx_->task->setError(exception);
}
std::string Driver::toString() {
std::stringstream out;
out << "{Driver: ";
if (state_.isOnThread()) {
out << "running ";
} else {
out << "blocked " << static_cast<int>(blockingReason_) << " ";
}
for (auto& op : operators_) {
out << op->toString() << " ";
}
out << "}";
return out.str();
}
SuspendedSection::SuspendedSection(Driver* FOLLY_NONNULL driver)
: driver_(driver) {
if (driver->task()->enterSuspended(driver->state()) != StopReason::kNone) {
VELOX_FAIL("Terminate detected when entering suspended section");
}
}
SuspendedSection::~SuspendedSection() {
if (driver_->task()->leaveSuspended(driver_->state()) != StopReason::kNone) {
VELOX_FAIL("Terminate detected when leaving suspended section");
}
}
std::string Driver::label() const {
return fmt::format("<Driver {}:{}>", ctx_->task->taskId(), ctx_->driverId);
}
std::string blockingReasonToString(BlockingReason reason) {
switch (reason) {
case BlockingReason::kNotBlocked:
return "kNotBlocked";
case BlockingReason::kWaitForConsumer:
return "kWaitForConsumer";
case BlockingReason::kWaitForSplit:
return "kWaitForSplit";
case BlockingReason::kWaitForExchange:
return "kWaitForExchange";
case BlockingReason::kWaitForJoinBuild:
return "kWaitForJoinBuild";
case BlockingReason::kWaitForMemory:
return "kWaitForMemory";
}
VELOX_UNREACHABLE();
return "";
};
} // namespace facebook::velox::exec