void SpartaWorkQueue::run_all()

in include/SpartaWorkQueue.h [259:323]


void SpartaWorkQueue<Input, Executor>::run_all() {
  m_state_counters.num_non_empty = 0;
  m_state_counters.num_running = 0;
  m_state_counters.waiter->take_all();
  auto worker = [&](SpartaWorkerState<Input>* state, size_t state_idx) {
    auto attempts =
        workqueue_impl::create_permutation(m_num_threads, state_idx);
    while (true) {
      auto have_task = false;
      for (auto idx : attempts) {
        auto other_state = m_states[idx].get();
        auto task = other_state->pop_task(state);
        if (task) {
          have_task = true;
          consume(state, *task);
          break;
        }
      }
      if (have_task) {
        continue;
      }

      state->set_running(false);
      if (!m_can_push_task) {
        // New tasks can't be added. We don't need to wait for the currently
        // running jobs to finish.
        return;
      }

      // Let the thread quit if all the threads are not running and there
      // is no task in any queue.
      if (m_state_counters.num_running == 0 &&
          m_state_counters.num_non_empty == 0) {
        // Wake up everyone who might be waiting, so they can quit.
        m_state_counters.waiter->give(m_state_counters.num_all);
        return;
      }

      m_state_counters.waiter->take(); // Wait for work.
    }
  };

  for (size_t i = 0; i < m_num_threads; ++i) {
    if (!m_states[i]->m_queue.empty()) {
      ++m_state_counters.num_non_empty;
    }
  }

  std::vector<boost::thread> all_threads;
  all_threads.reserve(m_num_threads);
  for (size_t i = 0; i < m_num_threads; ++i) {
    boost::thread::attributes attrs;
    attrs.set_stack_size(8 * 1024 * 1024);
    all_threads.emplace_back(attrs,
                             std::bind<void>(worker, m_states[i].get(), i));
  }

  for (auto& thread : all_threads) {
    thread.join();
  }

  for (size_t i = 0; i < m_num_threads; ++i) {
    assert(m_states[i]->m_queue.empty());
  }
}