util/ThreadsController.cpp (232 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/util/ThreadsController.h> #include <wdt/WdtOptions.h> using namespace std; namespace facebook { namespace wdt { void ConditionGuardImpl::wait(int timeoutMillis, const ThreadCtx &threadCtx) { const WdtOptions &options = threadCtx.getOptions(); const bool checkAbort = (options.abort_check_interval_millis > 0); int remainingTime = timeoutMillis; while (remainingTime > 0) { int waitTime = remainingTime; if (checkAbort) { waitTime = std::min(waitTime, options.abort_check_interval_millis); } auto waitingTime = chrono::milliseconds(waitTime); auto status = cv_.wait_for(*lock_, waitingTime); if (status == std::cv_status::no_timeout) { return; } // check for abort if (threadCtx.getAbortChecker()->shouldAbort()) { WLOG(ERROR) << "Transfer aborted during condition guard wait " << threadCtx.getThreadIndex(); return; } remainingTime -= waitTime; } } void ConditionGuardImpl::notifyAll() { cv_.notify_all(); } void ConditionGuardImpl::notifyOne() { cv_.notify_one(); } ConditionGuardImpl::~ConditionGuardImpl() { if (lock_ != nullptr) { delete lock_; } } ConditionGuardImpl::ConditionGuardImpl(mutex &guardMutex, condition_variable &cv) : cv_(cv) { lock_ = new unique_lock<mutex>(guardMutex); } ConditionGuardImpl::ConditionGuardImpl(ConditionGuardImpl &&that) noexcept : cv_(that.cv_) { swap(lock_, that.lock_); } ConditionGuardImpl ConditionGuard::acquire() { return ConditionGuardImpl(mutex_, cv_); } FunnelStatus Funnel::getStatus() { unique_lock<mutex> lock(mutex_); if (status_ == FUNNEL_START) { status_ = FUNNEL_PROGRESS; return FUNNEL_START; } return status_; } void Funnel::wait() { unique_lock<mutex> lock(mutex_); if (status_ != FUNNEL_PROGRESS) { return; } cv_.wait(lock); } void Funnel::wait(int32_t waitingTime, const ThreadCtx &threadCtx) { ConditionGuardImpl guard(mutex_, cv_); if (status_ != FUNNEL_PROGRESS) { return; } guard.wait(waitingTime, threadCtx); } void Funnel::notifySuccess() { unique_lock<mutex> lock(mutex_); status_ = FUNNEL_END; cv_.notify_all(); } void Funnel::notifyFail() { unique_lock<mutex> lock(mutex_); status_ = FUNNEL_START; cv_.notify_one(); } bool Barrier::checkForFinish() { // lock should be held while calling this method WDT_CHECK_GE(numThreads_, numHits_); if (numHits_ == numThreads_) { isComplete_ = true; cv_.notify_all(); } return isComplete_; } void Barrier::execute() { unique_lock<mutex> lock(mutex_); WDT_CHECK(!isComplete_) << "Hitting the barrier after completion"; ++numHits_; if (checkForFinish()) { return; } while (!isComplete_) { cv_.wait(lock); } } void Barrier::deRegister() { unique_lock<mutex> lock(mutex_); if (isComplete_) { return; } --numThreads_; checkForFinish(); } ThreadsController::ThreadsController(int totalThreads) { totalThreads_ = totalThreads; for (int threadNum = 0; threadNum < totalThreads; ++threadNum) { threadStateMap_[threadNum] = INIT; } execAtStart_.reset(new ExecuteOnceFunc(totalThreads_, true)); execAtEnd_.reset(new ExecuteOnceFunc(totalThreads_, false)); } void ThreadsController::registerThread(int threadIndex) { GuardLock lock(controllerMutex_); auto it = threadStateMap_.find(threadIndex); WDT_CHECK(it != threadStateMap_.end()); threadStateMap_[threadIndex] = RUNNING; } void ThreadsController::deRegisterThread(int threadIndex) { GuardLock lock(controllerMutex_); auto it = threadStateMap_.find(threadIndex); WDT_CHECK(it != threadStateMap_.end()); threadStateMap_[threadIndex] = FINISHED; // Notify all the barriers for (auto barrier : barriers_) { WDT_CHECK(barrier != nullptr); barrier->deRegister(); } } ThreadStatus ThreadsController::getState(int threadIndex) { GuardLock lock(controllerMutex_); auto it = threadStateMap_.find(threadIndex); WDT_CHECK(it != threadStateMap_.end()); return it->second; } void ThreadsController::markState(int threadIndex, ThreadStatus threadState) { GuardLock lock(controllerMutex_); threadStateMap_[threadIndex] = threadState; } unordered_map<int, ThreadStatus> ThreadsController::getThreadStates() const { GuardLock lock(controllerMutex_); return threadStateMap_; } int ThreadsController::getTotalThreads() { return totalThreads_; } bool ThreadsController::hasThreads(ThreadStatus threadState) { // thread indices are all positive return hasThreads(-1, threadState); } bool ThreadsController::hasThreads(int threadIndex, ThreadStatus threadState) { GuardLock lock(controllerMutex_); for (auto &threadPair : threadStateMap_) { if (threadPair.first == threadIndex) { continue; } if (threadPair.second == threadState) { return true; } } return false; } shared_ptr<ConditionGuard> ThreadsController::getCondition( const uint64_t conditionIndex) { bool isExists = (conditionGuards_.size() > conditionIndex) && (conditionGuards_[conditionIndex] != nullptr); WDT_CHECK(isExists) << "Requesting for a condition wrapper that doesn't exist." << " Request Index : " << conditionIndex << ", num condition wrappers : " << conditionGuards_.size(); return conditionGuards_[conditionIndex]; } shared_ptr<Barrier> ThreadsController::getBarrier(const uint64_t barrierIndex) { bool isExists = (barriers_.size() > barrierIndex) && (barriers_[barrierIndex] != nullptr); WDT_CHECK(isExists) << "Requesting for a barrier that doesn't exist. Request index : " << barrierIndex << ", num barriers " << barriers_.size(); return barriers_[barrierIndex]; } shared_ptr<Funnel> ThreadsController::getFunnel(const uint64_t funnelIndex) { bool isExists = (funnelExecutors_.size() > funnelIndex) && (funnelExecutors_[funnelIndex] != nullptr); WDT_CHECK(isExists) << "Requesting for a funnel that doesn't exist. Request index : " << funnelIndex << ", num funnels " << funnelExecutors_.size(); return funnelExecutors_[funnelIndex]; } void ThreadsController::reset() { // Only used in the case of long running mode setNumBarriers(barriers_.size()); setNumConditions(conditionGuards_.size()); setNumFunnels(funnelExecutors_.size()); execAtStart_->reset(); execAtEnd_->reset(); GuardLock lock(controllerMutex_); // Restore threads back to initial state for (auto &threadPair : threadStateMap_) { threadPair.second = RUNNING; } } void ThreadsController::setNumBarriers(int numBarriers) { // Meant to be called outside of threads barriers_.clear(); for (int i = 0; i < numBarriers; i++) { barriers_.push_back(make_shared<Barrier>(getTotalThreads())); } } void ThreadsController::setNumConditions(int numConditions) { conditionGuards_.clear(); for (int i = 0; i < numConditions; i++) { conditionGuards_.push_back(make_shared<ConditionGuard>()); } } void ThreadsController::setNumFunnels(int numFunnels) { funnelExecutors_.clear(); for (int i = 0; i < numFunnels; i++) { funnelExecutors_.push_back(make_shared<Funnel>()); } } int ThreadsController::numRunningThreads() { GuardLock lock(controllerMutex_); int ret = 0; for (auto& p : threadStateMap_) { if (p.second == RUNNING) { ret++; } } return ret; } } }