in src/kudu/util/threadpool.cc [598:746]
void ThreadPool::DispatchThread() {
MutexLock unique_lock(lock_);
InsertOrDie(&threads_, Thread::current_thread());
DCHECK_GT(num_threads_pending_start_, 0);
num_threads_++;
num_threads_pending_start_--;
// If we are one of the first 'min_threads_' to start, we must be
// a "permanent" thread.
bool permanent = num_threads_ <= min_threads_;
// Owned by this worker thread and added/removed from idle_threads_ as needed.
IdleThread me(&lock_);
while (true) {
// Note: Status::Aborted() is used to indicate normal shutdown.
if (!pool_status_.ok()) {
VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
break;
}
if (queue_.empty()) {
// There's no work to do, let's go idle.
//
// Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
idle_threads_.push_front(me);
SCOPED_CLEANUP({
// For some wake ups (i.e. Shutdown or DoSubmit) this thread is
// guaranteed to be unlinked after being awakened. In others (i.e.
// spurious wake-up or Wait timeout), it'll still be linked.
if (me.is_linked()) {
idle_threads_.erase(idle_threads_.iterator_to(me));
}
});
if (permanent) {
me.not_empty.Wait();
} else {
if (!me.not_empty.WaitFor(idle_timeout_)) {
// After much investigation, it appears that pthread condition variables have
// a weird behavior in which they can return ETIMEDOUT from timed_wait even if
// another thread did in fact signal. Apparently after a timeout there is some
// brief period during which another thread may actually grab the internal mutex
// protecting the state, signal, and release again before we get the mutex. So,
// we'll recheck the empty queue case regardless.
if (queue_.empty()) {
VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
<< idle_timeout_.ToMilliseconds() << "ms of idle time.";
break;
}
}
}
continue;
}
// Get the next token and task to execute.
ThreadPoolToken* token = queue_.front();
queue_.pop_front();
DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
DCHECK(!token->entries_.empty());
Task task = std::move(token->entries_.front());
token->entries_.pop_front();
token->active_threads_++;
--total_queued_tasks_;
++active_threads_;
unique_lock.Unlock();
// Release the reference which was held by the queued item.
ADOPT_TRACE(task.trace);
if (task.trace) {
task.trace->Release();
}
// Update metrics
MonoTime now(MonoTime::Now());
int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
if (metrics_.queue_time_us_histogram) {
metrics_.queue_time_us_histogram->Increment(queue_time_us);
}
if (token->metrics_.queue_time_us_histogram) {
token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
}
// Execute the task
{
MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
task.runnable->Run();
int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
if (metrics_.run_time_us_histogram) {
metrics_.run_time_us_histogram->Increment(wall_us);
}
if (token->metrics_.run_time_us_histogram) {
token->metrics_.run_time_us_histogram->Increment(wall_us);
}
TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
}
// Destruct the task while we do not hold the lock.
//
// The task's destructor may be expensive if it has a lot of bound
// objects, and we don't want to block submission of the threadpool.
// In the worst case, the destructor might even try to do something
// with this threadpool, and produce a deadlock.
task.runnable.reset();
unique_lock.Lock();
// Possible states:
// 1. The token was shut down while we ran its task. Transition to QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
// 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::RUNNING ||
state == ThreadPoolToken::State::QUIESCING);
if (--token->active_threads_ == 0) {
if (state == ThreadPoolToken::State::QUIESCING) {
DCHECK(token->entries_.empty());
token->Transition(ThreadPoolToken::State::QUIESCED);
} else if (token->entries_.empty()) {
token->Transition(ThreadPoolToken::State::IDLE);
} else if (token->mode() == ExecutionMode::SERIAL) {
queue_.emplace_back(token);
}
}
if (--active_threads_ == 0) {
idle_cond_.Broadcast();
}
}
// It's important that we hold the lock between exiting the loop and dropping
// num_threads_. Otherwise it's possible someone else could come along here
// and add a new task just as the last running thread is about to exit.
CHECK(unique_lock.OwnsLock());
CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
num_threads_--;
if (num_threads_ + num_threads_pending_start_ == 0) {
no_threads_cond_.Broadcast();
// Sanity check: if we're the last thread exiting, the queue ought to be
// empty. Otherwise it will never get processed.
CHECK(queue_.empty());
DCHECK_EQ(0, total_queued_tasks_);
}
}