in include/SpartaWorkQueue.h [259:323]
void SpartaWorkQueue<Input, Executor>::run_all() {
m_state_counters.num_non_empty = 0;
m_state_counters.num_running = 0;
m_state_counters.waiter->take_all();
auto worker = [&](SpartaWorkerState<Input>* state, size_t state_idx) {
auto attempts =
workqueue_impl::create_permutation(m_num_threads, state_idx);
while (true) {
auto have_task = false;
for (auto idx : attempts) {
auto other_state = m_states[idx].get();
auto task = other_state->pop_task(state);
if (task) {
have_task = true;
consume(state, *task);
break;
}
}
if (have_task) {
continue;
}
state->set_running(false);
if (!m_can_push_task) {
// New tasks can't be added. We don't need to wait for the currently
// running jobs to finish.
return;
}
// Let the thread quit if all the threads are not running and there
// is no task in any queue.
if (m_state_counters.num_running == 0 &&
m_state_counters.num_non_empty == 0) {
// Wake up everyone who might be waiting, so they can quit.
m_state_counters.waiter->give(m_state_counters.num_all);
return;
}
m_state_counters.waiter->take(); // Wait for work.
}
};
for (size_t i = 0; i < m_num_threads; ++i) {
if (!m_states[i]->m_queue.empty()) {
++m_state_counters.num_non_empty;
}
}
std::vector<boost::thread> all_threads;
all_threads.reserve(m_num_threads);
for (size_t i = 0; i < m_num_threads; ++i) {
boost::thread::attributes attrs;
attrs.set_stack_size(8 * 1024 * 1024);
all_threads.emplace_back(attrs,
std::bind<void>(worker, m_states[i].get(), i));
}
for (auto& thread : all_threads) {
thread.join();
}
for (size_t i = 0; i < m_num_threads; ++i) {
assert(m_states[i]->m_queue.empty());
}
}