void ThreadPool::DispatchThread()

in src/kudu/util/threadpool.cc [598:746]


void ThreadPool::DispatchThread() {
  MutexLock unique_lock(lock_);
  InsertOrDie(&threads_, Thread::current_thread());
  DCHECK_GT(num_threads_pending_start_, 0);
  num_threads_++;
  num_threads_pending_start_--;
  // If we are one of the first 'min_threads_' to start, we must be
  // a "permanent" thread.
  bool permanent = num_threads_ <= min_threads_;

  // Owned by this worker thread and added/removed from idle_threads_ as needed.
  IdleThread me(&lock_);

  while (true) {
    // Note: Status::Aborted() is used to indicate normal shutdown.
    if (!pool_status_.ok()) {
      VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
      break;
    }

    if (queue_.empty()) {
      // There's no work to do, let's go idle.
      //
      // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
      idle_threads_.push_front(me);
      SCOPED_CLEANUP({
        // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
        // guaranteed to be unlinked after being awakened. In others (i.e.
        // spurious wake-up or Wait timeout), it'll still be linked.
        if (me.is_linked()) {
          idle_threads_.erase(idle_threads_.iterator_to(me));
        }
      });
      if (permanent) {
        me.not_empty.Wait();
      } else {
        if (!me.not_empty.WaitFor(idle_timeout_)) {
          // After much investigation, it appears that pthread condition variables have
          // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
          // another thread did in fact signal. Apparently after a timeout there is some
          // brief period during which another thread may actually grab the internal mutex
          // protecting the state, signal, and release again before we get the mutex. So,
          // we'll recheck the empty queue case regardless.
          if (queue_.empty()) {
            VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
                    << idle_timeout_.ToMilliseconds() << "ms of idle time.";
            break;
          }
        }
      }
      continue;
    }

    // Get the next token and task to execute.
    ThreadPoolToken* token = queue_.front();
    queue_.pop_front();
    DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
    DCHECK(!token->entries_.empty());
    Task task = std::move(token->entries_.front());
    token->entries_.pop_front();
    token->active_threads_++;
    --total_queued_tasks_;
    ++active_threads_;

    unique_lock.Unlock();

    // Release the reference which was held by the queued item.
    ADOPT_TRACE(task.trace);
    if (task.trace) {
      task.trace->Release();
    }

    // Update metrics
    MonoTime now(MonoTime::Now());
    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
    TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
    if (metrics_.queue_time_us_histogram) {
      metrics_.queue_time_us_histogram->Increment(queue_time_us);
    }
    if (token->metrics_.queue_time_us_histogram) {
      token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
    }

    // Execute the task
    {
      MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
      MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();

      task.runnable->Run();

      int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
      int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;

      if (metrics_.run_time_us_histogram) {
        metrics_.run_time_us_histogram->Increment(wall_us);
      }
      if (token->metrics_.run_time_us_histogram) {
        token->metrics_.run_time_us_histogram->Increment(wall_us);
      }
      TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
      TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
    }
    // Destruct the task while we do not hold the lock.
    //
    // The task's destructor may be expensive if it has a lot of bound
    // objects, and we don't want to block submission of the threadpool.
    // In the worst case, the destructor might even try to do something
    // with this threadpool, and produce a deadlock.
    task.runnable.reset();
    unique_lock.Lock();

    // Possible states:
    // 1. The token was shut down while we ran its task. Transition to QUIESCED.
    // 2. The token has no more queued tasks. Transition back to IDLE.
    // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
    ThreadPoolToken::State state = token->state();
    DCHECK(state == ThreadPoolToken::State::RUNNING ||
           state == ThreadPoolToken::State::QUIESCING);
    if (--token->active_threads_ == 0) {
      if (state == ThreadPoolToken::State::QUIESCING) {
        DCHECK(token->entries_.empty());
        token->Transition(ThreadPoolToken::State::QUIESCED);
      } else if (token->entries_.empty()) {
        token->Transition(ThreadPoolToken::State::IDLE);
      } else if (token->mode() == ExecutionMode::SERIAL) {
        queue_.emplace_back(token);
      }
    }
    if (--active_threads_ == 0) {
      idle_cond_.Broadcast();
    }
  }

  // It's important that we hold the lock between exiting the loop and dropping
  // num_threads_. Otherwise it's possible someone else could come along here
  // and add a new task just as the last running thread is about to exit.
  CHECK(unique_lock.OwnsLock());

  CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
  num_threads_--;
  if (num_threads_ + num_threads_pending_start_ == 0) {
    no_threads_cond_.Broadcast();

    // Sanity check: if we're the last thread exiting, the queue ought to be
    // empty. Otherwise it will never get processed.
    CHECK(queue_.empty());
    DCHECK_EQ(0, total_queued_tasks_);
  }
}