in dispenso/thread_pool.h [42:160]
class alignas(kCacheLineSize) ThreadPool {
public:
/**
* Construct a thread pool.
*
* @param n The number of threads to spawn at construction.
* @param poolLoadMultiplier A parameter that specifies how overloaded the pool should be before
* allowing the current thread to self-steal work.
**/
ThreadPool(size_t n, size_t poolLoadMultiplier = 32)
: poolLoadMultiplier_(poolLoadMultiplier),
poolLoadFactor_(static_cast<ssize_t>(n * poolLoadMultiplier)),
numThreads_(static_cast<ssize_t>(n)) {
#if defined DISPENSO_DEBUG
assert(poolLoadMultiplier > 0);
#endif // DISPENSO_DEBUG
for (size_t i = 0; i < n; ++i) {
threads_.emplace_back();
auto& back = threads_.back();
back.running = true;
back.thread = std::thread([this, &running = back.running]() { threadLoop(running); });
}
}
/**
* Change the number of threads backing the thread pool. This is a blocking and potentially slow
* operation, and repeatedly resizing is discouraged.
*
* @param n The number of threads in use after call completion
**/
DISPENSO_DLL_ACCESS void resize(ssize_t n);
/**
* Get the number of threads backing the pool. If called concurrently to <code>resize</code>, the
* number returned may be stale.
*
* @return The current number of threads backing the pool.
**/
ssize_t numThreads() const {
return numThreads_.load(std::memory_order_relaxed);
}
/**
* Schedule a functor to be executed. If the pool's load factor is high, execution may happen
* inline by the calling thread.
*
* @param f The functor to be executed. <code>f</code>'s signature must match void(). Best
* performance will come from passing lambdas, other concrete functors, or OnceFunction, but
* std::function or similarly type-erased objects will also work.
**/
template <typename F>
void schedule(F&& f);
/**
* Schedule a functor to be executed. The functor will always be queued and executed by pool
* threads.
*
* @param f The functor to be executed. <code>f</code>'s signature must match void(). Best
* performance will come from passing lambdas, other concrete functors, or OnceFunction, but
* std::function or similarly type-erased objects will also work.
**/
template <typename F>
void schedule(F&& f, ForceQueuingTag);
/**
* Destruct the pool. This destructor is blocking until all queued work is completed. It is
* illegal to call the destructor while any other thread makes calls to the pool (as is generally
* the case with C++ classes).
**/
DISPENSO_DLL_ACCESS ~ThreadPool();
private:
void executeNext(OnceFunction work);
DISPENSO_DLL_ACCESS void threadLoop(std::atomic<bool>& running);
bool tryExecuteNext();
bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
template <typename F>
void schedule(moodycamel::ProducerToken& token, F&& f);
template <typename F>
void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
public:
// If we are not yet C++17, we provide aligned new/delete to avoid false sharing.
#if __cplusplus < 201703L
static void* operator new(size_t sz) {
return detail::alignedMalloc(sz);
}
static void operator delete(void* ptr) {
return detail::alignedFree(ptr);
}
#endif // __cplusplus
private:
struct PerThreadData {
alignas(kCacheLineSize) std::thread thread;
std::atomic<bool> running;
};
mutable std::mutex threadsMutex_;
std::deque<PerThreadData> threads_;
size_t poolLoadMultiplier_;
std::atomic<ssize_t> poolLoadFactor_;
std::atomic<ssize_t> numThreads_;
moodycamel::ConcurrentQueue<OnceFunction> work_;
alignas(kCacheLineSize) std::atomic<ssize_t> workRemaining_{0};
#if defined DISPENSO_DEBUG
alignas(kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
#endif // NDEBUG
friend class ConcurrentTaskSet;
friend class TaskSet;
};