in async_simple/util/ThreadPool.h [94:164]
inline ThreadPool::ThreadPool(size_t threadNum, bool enableWorkSteal,
bool enableCoreBindings)
: _threadNum(threadNum ? threadNum : std::thread::hardware_concurrency()),
_queues(_threadNum),
_stop(false),
_enableWorkSteal(enableWorkSteal),
_enableCoreBindings(enableCoreBindings) {
auto worker = [this](size_t id) {
auto current = getCurrent();
current->first = id;
current->second = this;
while (true) {
WorkItem workerItem = {};
if (_enableWorkSteal) {
// Try to do work steal firstly.
for (auto n = 0; n < _threadNum * 2; ++n) {
if (_queues[(id + n) % _threadNum].try_pop_if(
workerItem,
[](auto &item) { return item.canSteal; }))
break;
}
}
if (!workerItem.fn && !_queues[id].pop(workerItem)) {
// If thread is going to stop, don't wait for any new task any
// more. Otherwise wait for a pop task if _enableWorkSteal false
// or work steal failed,
if (_stop)
break;
else
continue;
}
if (workerItem.fn)
workerItem.fn();
}
};
_threads.reserve(_threadNum);
#ifdef __linux__
// Since the CPU IDs might not start at 0 and might not be continuous
// in the containers,
// we need to get the available cpus at first.
std::vector<uint32_t> cpu_ids;
if (_enableCoreBindings)
getCurrentCpus(cpu_ids);
#else
// Avoid unused member warning.
// [[maybe_unused]] in non-static data members is ignored in GCC.
(void)_enableCoreBindings;
#endif
for (auto i = 0; i < _threadNum; ++i) {
_threads.emplace_back(worker, i);
#ifdef __linux__
if (!_enableCoreBindings)
continue;
// Run threads per core.
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_ids[i % cpu_ids.size()], &cpuset);
int rc = sched_setaffinity(_threads[i].native_handle(),
sizeof(cpu_set_t), &cpuset);
if (rc != 0)
std::cerr << "Error calling sched_setaffinity: " << rc << "\n";
#endif
}
}