inline ThreadPool::ThreadPool()

in async_simple/util/ThreadPool.h [94:164]


inline ThreadPool::ThreadPool(size_t threadNum, bool enableWorkSteal,
                              bool enableCoreBindings)
    : _threadNum(threadNum ? threadNum : std::thread::hardware_concurrency()),
      _queues(_threadNum),
      _stop(false),
      _enableWorkSteal(enableWorkSteal),
      _enableCoreBindings(enableCoreBindings) {
    auto worker = [this](size_t id) {
        auto current = getCurrent();
        current->first = id;
        current->second = this;
        while (true) {
            WorkItem workerItem = {};
            if (_enableWorkSteal) {
                // Try to do work steal firstly.
                for (auto n = 0; n < _threadNum * 2; ++n) {
                    if (_queues[(id + n) % _threadNum].try_pop_if(
                            workerItem,
                            [](auto &item) { return item.canSteal; }))
                        break;
                }
            }

            if (!workerItem.fn && !_queues[id].pop(workerItem)) {
                // If thread is going to stop, don't wait for any new task any
                // more. Otherwise wait for a pop task if _enableWorkSteal false
                // or work steal failed,
                if (_stop)
                    break;
                else
                    continue;
            }

            if (workerItem.fn)
                workerItem.fn();
        }
    };

    _threads.reserve(_threadNum);

#ifdef __linux__
    // Since the CPU IDs might not start at 0 and might not be continuous
    // in the containers,
    // we need to get the available cpus at first.
    std::vector<uint32_t> cpu_ids;
    if (_enableCoreBindings)
        getCurrentCpus(cpu_ids);
#else
    // Avoid unused member warning.
    // [[maybe_unused]] in non-static data members is ignored in GCC.
    (void)_enableCoreBindings;
#endif

    for (auto i = 0; i < _threadNum; ++i) {
        _threads.emplace_back(worker, i);

#ifdef __linux__
        if (!_enableCoreBindings)
            continue;

        // Run threads per core.
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(cpu_ids[i % cpu_ids.size()], &cpuset);
        int rc = sched_setaffinity(_threads[i].native_handle(),
                                   sizeof(cpu_set_t), &cpuset);
        if (rc != 0)
            std::cerr << "Error calling sched_setaffinity: " << rc << "\n";
#endif
    }
}