util/ThreadsController.h (156 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <wdt/ErrorCodes.h> #include <wdt/WdtThread.h> #include <condition_variable> #include <memory> #include <mutex> #include <unordered_map> #include <vector> namespace facebook { namespace wdt { class WdtThread; /** * Thread states that represent what kind of functionality * are they executing on a higher level. * INIT - State before running at the time of construction * RUNNING - Thread is running without any errors * WAITING - Thread is not doing anything meaninful but * rather waiting on other threads for something * FINISHED - Threads have finished with/without error */ enum ThreadStatus { INIT, RUNNING, WAITING, FINISHED }; /** * Primitive that takes a function and executes * it only once either on the first thread * or the last thread entrance */ class ExecuteOnceFunc { public: /// Constructor for the once only executor ExecuteOnceFunc(int numThreads, bool execFirst) { execFirst_ = execFirst; numThreads_ = numThreads; } /// Deleted copy constructor ExecuteOnceFunc(const ExecuteOnceFunc &that) = delete; /// Deleted assignment operator ExecuteOnceFunc &operator=(const ExecuteOnceFunc &that) = delete; /// Implements the main functionality of the executor template <typename Func> void execute(Func &&execFunc) { std::unique_lock<std::mutex> lock(mutex_); ++numHits_; WDT_CHECK(numHits_ <= numThreads_); int64_t numExpected = (execFirst_) ? 1 : numThreads_; if (numHits_ == numExpected) { execFunc(); } } /// Reset the number of hits void reset() { numHits_ = 0; } private: /// Mutex for thread synchronization std::mutex mutex_; /// Number of times execute has been called int numHits_{0}; /// Function can be executed on the first /// thread or the last thread bool execFirst_{true}; /// Number of total threads int numThreads_; }; /** * A scoped locking primitive. When you get this object * it means that you already have the lock. You can also * wait, notify etc using this primitive */ class ConditionGuardImpl { public: /// Release the lock and wait for the timeout /// After the wait is over, lock is reacquired void wait(int timeoutMillis, const ThreadCtx &threadCtx); /// Notify all the threads waiting on the lock void notifyAll(); /// Notify one thread waiting on the lock void notifyOne(); /// Delete the copy constructor ConditionGuardImpl(const ConditionGuardImpl &that) = delete; /// Delete the copy assignment operator ConditionGuardImpl &operator=(const ConditionGuardImpl &that) = delete; /// Move constructor for the guard ConditionGuardImpl(ConditionGuardImpl &&that) noexcept; /// Move assignment operator deleted ConditionGuardImpl &operator=(ConditionGuardImpl &&that) = delete; /// Destructor that releases the lock, you would explicitly /// need to notify any other threads waiting in the wait() ~ConditionGuardImpl(); protected: friend class ConditionGuard; friend class Funnel; /// Constructor that takes the shared mutex and condition /// variable ConditionGuardImpl(std::mutex &mutex, std::condition_variable &cv); /// Instance of lock is made on construction with the specified mutex std::unique_lock<std::mutex> *lock_{nullptr}; /// Shared condition variable std::condition_variable &cv_; }; /** * Class for simplifying the primitive to take a lock * in conjunction with the ability to do things * on a condition variable based on the lock. * Use the condition guard like this * ConditionGuard condition; * auto guard = condition.acquire(); * guard.wait(); */ class ConditionGuard { public: /// Caller has to call acquire before doing anything ConditionGuardImpl acquire(); /// Default constructor ConditionGuard() { } /// Deleted copy constructor ConditionGuard(const ConditionGuard &that) = delete; /// Deleted assignment operator ConditionGuard &operator=(const ConditionGuard &that) = delete; private: /// Mutex for the condition variable std::mutex mutex_; /// std condition variable to support the functionality std::condition_variable cv_; }; /** * A barrier primitive. When called for executing * will block the threads till all the threads registered * call execute() */ class Barrier { public: /// Deleted copy constructor Barrier(const Barrier &that) = delete; /// Deleted assignment operator Barrier &operator=(const Barrier &that) = delete; /// Constructor which takes total number of threads /// to be hit in order for the barrier to clear explicit Barrier(int numThreads) { numThreads_ = numThreads; WVLOG(1) << "making barrier with " << numThreads; } /// Executes the main functionality of the barrier void execute(); /** * Thread controller should call this method when one thread * has been finished, since that thread will no longer be * participating in the barrier */ void deRegister(); private: /// Checks for finish, need to hold a lock to call this method bool checkForFinish(); /// Condition variable that threads wait on std::condition_variable cv_; /// Number of threads entered the execute int64_t numHits_{0}; /// Total number of threads that are supposed /// to hit the barrier int numThreads_{0}; /// Thread synchronization mutex std::mutex mutex_; /// Represents the completion of barrier bool isComplete_{false}; }; /** * Different stages of the simple funnel * FUNNEL_START the state of funnel at the beginning * FUNNEL_PROGRESS is set by the first thread to enter the funnel * and it means that funnel functionality is in progress * FUNNEL_END means that funnel functionality has been executed */ enum FunnelStatus { FUNNEL_START, FUNNEL_PROGRESS, FUNNEL_END }; /** * Primitive that makes the threads execute in a funnel * manner. Only one thread gets to execute the main functionality * while other entering threads wait (while executing a function) */ class Funnel { public: /// Deleted copy constructor Funnel(const Funnel &that) = delete; /// Default constructor for funnel Funnel() { status_ = FUNNEL_START; } /// Deleted assignment operator Funnel &operator=(const Funnel &that) = delete; /** * Get the current status of funnel. * If the status is FUNNEL_START it gets set * to FUNNEL_PROGRESS else it is just a get */ FunnelStatus getStatus(); /// Threads in progress can wait indefinitely void wait(); /// Threads that get status as progress execute this function void wait(int32_t waitingTime, const ThreadCtx &threadCtx); /** * The first thread that was able to start the funnel * calls this method on successful execution */ void notifySuccess(); /// The first thread that was able to start the funnel /// calls this method on failure in execution void notifyFail(); private: /// Status of the funnel FunnelStatus status_; /// Mutex for the simple funnel executor std::mutex mutex_; /// Condition variable on which progressing threads wait std::condition_variable cv_; }; /** * Controller class responsible for the receiver * and sender threads. Manages the states of threads and * session information */ class ThreadsController { public: /// Constructor that takes in the total number of threads /// to be run explicit ThreadsController(int totalThreads); /// Make threads of a type Sender/Receiver template <typename WdtBaseType, typename WdtThreadType> std::vector<std::unique_ptr<WdtThread>> makeThreads( WdtBaseType *wdtParent, int numThreads, const std::vector<int32_t> &ports) { std::vector<std::unique_ptr<WdtThread>> threads; for (int threadIndex = 0; threadIndex < numThreads; ++threadIndex) { threads.emplace_back(std::make_unique<WdtThreadType>( wdtParent, threadIndex, ports[threadIndex], this)); } return threads; } /// Mark the state of a thread void markState(int threadIndex, ThreadStatus state); /// Get the status of the thread by index ThreadStatus getState(int threadIndex); /// Execute a function func once, by the first thread template <typename FunctionType> void executeAtStart(FunctionType &&fn) const { execAtStart_->execute(fn); } /// Execute a function once by the last thread template <typename FunctionType> void executeAtEnd(FunctionType &&fn) const { execAtEnd_->execute(fn); } /// Returns a funnel executor shared between the threads /// If the executor does not exist then it creates one std::shared_ptr<Funnel> getFunnel(uint64_t funnelIndex); /// Returns a barrier shared between the threads /// If the executor does not exist then it creates one std::shared_ptr<Barrier> getBarrier(uint64_t barrierIndex); /// Get the condition variable wrapper std::shared_ptr<ConditionGuard> getCondition(uint64_t conditionIndex); /* * Returns back states of all the threads */ std::unordered_map<int, ThreadStatus> getThreadStates() const; /// Register a thread, a thread registers with the state RUNNING void registerThread(int threadIndex); /// De-register a thread, marks it ended void deRegisterThread(int threadIndex); /// Returns true if any thread apart from the calling is in the state bool hasThreads(int threadIndex, ThreadStatus threadState); /// @return true if any registered thread is in the state bool hasThreads(ThreadStatus threadState); /// Get the nunber of registered threads int getTotalThreads(); /// Get the number of threads with status RUNNING int numRunningThreads(); /// Reset the thread controller so that same instance can be used again void reset(); /// Set the total number of barriers void setNumBarriers(int numBarriers); /// Set the number of condition wrappers void setNumConditions(int numConditions); /// Set total number of funnel executors void setNumFunnels(int numFunnels); /// Destructor for the threads controller ~ThreadsController() { } private: /// Total number of threads managed by the thread controller int totalThreads_; typedef std::unique_lock<std::mutex> GuardLock; /// Mutex used in all of the thread controller methods mutable std::mutex controllerMutex_; /// States of the threads std::unordered_map<int, ThreadStatus> threadStateMap_; /// Executor to execute things at the start of transfer std::unique_ptr<ExecuteOnceFunc> execAtStart_; /// Executor to execute things at the end of transfer std::unique_ptr<ExecuteOnceFunc> execAtEnd_; /// Vector of funnel executors, read/modified by get/set funnel methods std::vector<std::shared_ptr<Funnel>> funnelExecutors_; /// Vector of condition wrappers, read/modified by get/set condition methods std::vector<std::shared_ptr<ConditionGuard>> conditionGuards_; /// Vector of barriers, can be read/modified by get/set barrier methods std::vector<std::shared_ptr<Barrier>> barriers_; }; } }