velox/exec/Driver.h (217 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/futures/Future.h> #include <folly/portability/SysSyscall.h> #include "velox/common/future/VeloxPromise.h" #include "velox/connectors/Connector.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" namespace facebook::velox::exec { class Driver; class ExchangeClient; class Operator; struct OperatorStats; class Task; enum class StopReason { // Keep running. kNone, // Go off thread and do not schedule more activity. kPause, // Stop and free all. This is returned once and the thread that gets // this value is responsible for freeing the state associated with // the thread. Other threads will get kAlreadyTerminated after the // first thread has received kTerminate. kTerminate, kAlreadyTerminated, // Go off thread and then enqueue to the back of the runnable queue. kYield, // Must wait for external events. kBlock, // No more data to produce. kAtEnd, kAlreadyOnThread }; // Represents a Driver's state. This is used for cancellation, forcing // release of and for waiting for memory. The fields are serialized on // the mutex of the Driver's Task. // // The Driver goes through the following states: // Not on thread. It is created and has not started. All flags are false. // // Enqueued - The Driver is added to an executor but does not yet have a thread. // isEnqueued is true. Next states are terminated or on thread. // // On thread - 'thread' is set to the thread that is running the Driver. Next // states are blocked, terminated, suspended, enqueued. // // Blocked - The Driver is not on thread and is waiting for an external event. // Next states are terminated, enqueued. // // Suspended - The Driver is on thread, 'thread' and 'isSuspended' are set. The // thread does not manipulate the Driver's state and is suspended as in waiting // for memory or out of process IO. This is different from Blocked in that here // we keep the stack so that when the wait is over the control stack is not // lost. Next states are on thread or terminated. // // Terminated - 'isTerminated' is set. The Driver cannot run after this and // the state is final. // // CancelPool allows terminating or pausing a set of Drivers. The Task API // allows starting or resuming Drivers. When terminate is requested the request // is successful when all Drivers are off thread, blocked or suspended. When // pause is requested, we have success when all Drivers are either enqueued, // suspended, off thread or blocked. struct ThreadState { // The thread currently running this. std::atomic<std::thread::id> thread{}; // The tid of 'thread'. Allows finding the thread in a debugger. std::atomic<int32_t> tid{0}; // True if queued on an executor but not on thread. std::atomic<bool> isEnqueued{false}; // True if being terminated or already terminated. std::atomic<bool> isTerminated{false}; // True if there is a future outstanding that will schedule this on an // executor thread when some promise is realized. bool hasBlockingFuture{false}; // True if on thread but in a section waiting for RPC or memory // strategy decision. The thread is not supposed to access its // memory, which a third party can revoke while the thread is in // this state. bool isSuspended{false}; bool isOnThread() const { return thread != std::thread::id(); } void setThread() { thread = std::this_thread::get_id(); #if !defined(__APPLE__) // This is a debugging feature disabled on the Mac since syscall // is deprecated on that platform. tid = syscall(FOLLY_SYS_gettid); #endif } void clearThread() { thread = std::thread::id(); // no thread. tid = 0; } }; enum class BlockingReason { kNotBlocked, kWaitForConsumer, kWaitForSplit, kWaitForExchange, kWaitForJoinBuild, kWaitForMemory }; std::string blockingReasonToString(BlockingReason reason); using ContinueFuture = folly::SemiFuture<bool>; class BlockingState { public: BlockingState( std::shared_ptr<Driver> driver, ContinueFuture&& future, Operator* FOLLY_NONNULL op, BlockingReason reason); static void setResume(std::shared_ptr<BlockingState> state); Operator* FOLLY_NONNULL op() { return operator_; } BlockingReason reason() { return reason_; } private: std::shared_ptr<Driver> driver_; ContinueFuture future_; Operator* FOLLY_NONNULL operator_; BlockingReason reason_; uint64_t sinceMicros_; }; struct DriverCtx { const int driverId; const int pipelineId; /// Id of the split group this driver should process in case of grouped /// execution, zero otherwise. const uint32_t splitGroupId; /// Id of the partition to use by this driver. For local exchange, for /// instance. const uint32_t partitionId; std::shared_ptr<Task> task; memory::MemoryPool* FOLLY_NONNULL pool; Driver* FOLLY_NONNULL driver; explicit DriverCtx( std::shared_ptr<Task> _task, int _driverId, int _pipelineId, uint32_t _splitGroupId, uint32_t _partitionId); const core::QueryConfig& queryConfig() const; velox::memory::MemoryPool* FOLLY_NONNULL addOperatorPool(); }; class Driver { public: Driver( std::unique_ptr<DriverCtx> driverCtx, std::vector<std::unique_ptr<Operator>>&& operators); ~Driver(); static void run(std::shared_ptr<Driver> self); static void enqueue(std::shared_ptr<Driver> instance); bool isOnThread() const { return state_.isOnThread(); } bool isTerminated() const { return state_.isTerminated; } std::string label() const; ThreadState& state() { return state_; } void initializeOperatorStats(std::vector<OperatorStats>& stats); void addStatsToTask(); // Returns true if all operators between the source and 'aggregation' are // order-preserving and do not increase cardinality. bool mayPushdownAggregation(Operator* FOLLY_NONNULL aggregation) const; // Returns a subset of channels for which there are operators upstream from // filterSource that accept dynamically generated filters. std::unordered_set<ChannelIndex> canPushdownFilters( Operator* FOLLY_NONNULL filterSource, const std::vector<ChannelIndex>& channels) const; // Returns the Operator with 'planNodeId.' or nullptr if not // found. For example, hash join probe accesses the corresponding // build by id. Operator* FOLLY_NULLABLE findOperator(std::string_view planNodeId) const; void setError(std::exception_ptr exception); std::string toString(); DriverCtx* FOLLY_NONNULL driverCtx() const { return ctx_.get(); } std::shared_ptr<Task> task() const { return task_; } // Updates the stats in 'task_' and frees resources. Only called by Task for // closing non-running Drivers. void closeByTask(); // This is called if the creation of drivers failed and we want to disconnect // driver from the task before driver's destruction. void disconnectFromTask(); private: void enqueueInternal(); StopReason runInternal( std::shared_ptr<Driver>& self, std::shared_ptr<BlockingState>* FOLLY_NONNULL blockingState); void close(); // Push down dynamic filters produced by the operator at the specified // position in the pipeline. void pushdownFilters(int operatorIndex); std::unique_ptr<DriverCtx> ctx_; std::shared_ptr<Task> task_; // Set via Task_ and serialized by 'task_'s mutex. ThreadState state_; // Timer used to track down the time we are sitting in the driver queue. size_t queueTimeStartMicros_{0}; // Index of the current operator to run (or the 1st one if we haven't started // yet). Used to determine which operator's queueTime we should update. size_t curOpIndex_{0}; std::vector<std::unique_ptr<Operator>> operators_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; }; using OperatorSupplier = std::function<std::unique_ptr<Operator>( int32_t operatorId, DriverCtx* FOLLY_NONNULL ctx)>; using Consumer = std::function<BlockingReason(RowVectorPtr, ContinueFuture* FOLLY_NULLABLE)>; using ConsumerSupplier = std::function<Consumer()>; struct DriverFactory { std::vector<std::shared_ptr<const core::PlanNode>> planNodes; /// Function that will generate the final operator of a driver being /// constructed. OperatorSupplier consumerSupplier; /// Maximum number of drivers that can be run concurrently in this pipeline. uint32_t maxDrivers; /// Number of drivers that will be run concurrently in this pipeline for one /// split group (during grouped execution) or for the whole task (ungrouped /// execution). uint32_t numDrivers; /// Total number of drivers in this pipeline we expect to be run. In case of /// grouped execution it is 'numDrivers' * 'numSplitGroups', otherwise it is /// 'numDrivers'. uint32_t numTotalDrivers; /// The (local) node that will consume results supplied by this pipeline. /// Can be null. We use that to determine the max drivers. std::shared_ptr<const core::PlanNode> consumerNode; // True if 'planNodes' contains a source node for the task, e.g. TableScan or // Exchange. bool inputDriver{false}; // True if 'planNodes' contains a sync node for the task, e.g. // PartitionedOutput. bool outputDriver{false}; std::shared_ptr<Driver> createDriver( std::unique_ptr<DriverCtx> ctx, std::shared_ptr<ExchangeClient> exchangeClient, std::function<int(int pipelineId)> numDrivers); std::shared_ptr<const core::PartitionedOutputNode> needsPartitionedOutput() { VELOX_CHECK(!planNodes.empty()); if (auto partitionedOutputNode = std::dynamic_pointer_cast<const core::PartitionedOutputNode>( planNodes.back())) { return partitionedOutputNode; } return nullptr; } bool needsExchangeClient() const { VELOX_CHECK(!planNodes.empty()); if (auto exchangeNode = std::dynamic_pointer_cast<const core::ExchangeNode>( planNodes.front())) { return true; } return false; } /// Returns LocalPartition plan node ID if the pipeline gets data from a local /// exchange. std::optional<core::PlanNodeId> needsLocalExchangeSource() const { VELOX_CHECK(!planNodes.empty()); if (auto exchangeNode = std::dynamic_pointer_cast<const core::LocalPartitionNode>( planNodes.front())) { return exchangeNode->id(); } return std::nullopt; } /// Returns plan node IDs of all HashJoinNode's in the pipeline. std::vector<core::PlanNodeId> needsHashJoinBridges() const { std::vector<core::PlanNodeId> planNodeIds; for (const auto& planNode : planNodes) { if (auto joinNode = std::dynamic_pointer_cast<const core::HashJoinNode>(planNode)) { planNodeIds.emplace_back(joinNode->id()); } } return planNodeIds; } /// Returns plan node IDs of all CrossJoinNode's in the pipeline. std::vector<core::PlanNodeId> needsCrossJoinBridges() const { std::vector<core::PlanNodeId> joinNodeIds; for (const auto& planNode : planNodes) { if (auto joinNode = std::dynamic_pointer_cast<const core::CrossJoinNode>(planNode)) { joinNodeIds.emplace_back(joinNode->id()); } } return joinNodeIds; } }; // Begins and ends a section where a thread is running but not // counted in its Task. Using this, a Driver thread can for // example stop its own Task. For arbitrating memory overbooking, // the contending threads go suspended and each in turn enters a // global critical section. When running the arbitration strategy, a // thread can stop and restart Tasks, including its own. When a Task // is stopped, its drivers are blocked or suspended and the strategy thread // can alter the Task's memory including spilling or killing the whole Task. // Other threads waiting to run the arbitration, are in a suspended state // which also means that they are instantaneously killable or spillable. class SuspendedSection { public: explicit SuspendedSection(Driver* FOLLY_NONNULL driver); ~SuspendedSection(); private: Driver* FOLLY_NONNULL driver_; }; } // namespace facebook::velox::exec