bool EventBase::loopBody()

in folly/io/async/EventBase.cpp [323:479]


bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
  VLOG(5) << "EventBase(): Starting loop.";

  const char* message =
      "Your code just tried to loop over an event base from inside another "
      "event base loop. Since libevent is not reentrant, this leads to "
      "undefined behavior in opt builds. Please fix immediately. For the "
      "common case of an inner function that needs to do some synchronous "
      "computation on an event-base, replace getEventBase() by a new, "
      "stack-allocated EventBase.";

  LOG_IF(DFATAL, invokingLoop_) << message;

  invokingLoop_ = true;
  SCOPE_EXIT { invokingLoop_ = false; };

  int res = 0;
  bool ranLoopCallbacks;
  bool blocking = !(flags & EVLOOP_NONBLOCK);
  bool once = (flags & EVLOOP_ONCE);

  // time-measurement variables.
  std::chrono::steady_clock::time_point prev;
  std::chrono::steady_clock::time_point idleStart = {};
  std::chrono::microseconds busy;
  std::chrono::microseconds idle;

  auto const prevLoopThread = loopThread_.exchange(
      std::this_thread::get_id(), std::memory_order_release);
  CHECK_EQ(std::thread::id(), prevLoopThread)
      << "Driving an EventBase in one thread (" << std::this_thread::get_id()
      << ") while it is already being driven in another thread ("
      << prevLoopThread << ") is forbidden.";

  if (!name_.empty()) {
    setThreadName(name_);
  }

  if (enableTimeMeasurement_) {
    prev = std::chrono::steady_clock::now();
    idleStart = prev;
  }

  while (!stop_.load(std::memory_order_relaxed)) {
    if (!ignoreKeepAlive) {
      applyLoopKeepAlive();
    }
    ++nextLoopCnt_;

    // Run the before loop callbacks
    LoopCallbackList callbacks;
    callbacks.swap(runBeforeLoopCallbacks_);

    runLoopCallbacks(callbacks);

    // nobody can add loop callbacks from within this thread if
    // we don't have to handle anything to start with...
    if (blocking && loopCallbacks_.empty()) {
      res = evb_->eb_event_base_loop(EVLOOP_ONCE);
    } else {
      res = evb_->eb_event_base_loop(EVLOOP_ONCE | EVLOOP_NONBLOCK);
    }

    ranLoopCallbacks = runLoopCallbacks();

    if (enableTimeMeasurement_) {
      auto now = std::chrono::steady_clock::now();
      busy = std::chrono::duration_cast<std::chrono::microseconds>(
          now - startWork_);
      idle = std::chrono::duration_cast<std::chrono::microseconds>(
          startWork_ - idleStart);
      auto loop_time = busy + idle;

      avgLoopTime_.addSample(loop_time, busy);
      maxLatencyLoopTime_.addSample(loop_time, busy);

      if (observer_) {
        if (observerSampleCount_++ == observer_->getSampleRate()) {
          observerSampleCount_ = 0;
          observer_->loopSample(busy.count(), idle.count());
        }
      }

      VLOG(11) << "EventBase " << this << " did not timeout "
               << " loop time guess: " << loop_time.count()
               << " idle time: " << idle.count()
               << " busy time: " << busy.count()
               << " avgLoopTime: " << avgLoopTime_.get()
               << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
               << " maxLatency_: " << maxLatency_.count() << "us"
               << " notificationQueueSize: " << getNotificationQueueSize()
               << " nothingHandledYet(): " << nothingHandledYet();

      if (maxLatency_ > std::chrono::microseconds::zero()) {
        // see if our average loop time has exceeded our limit
        if (dampenMaxLatency_ &&
            (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
          maxLatencyCob_();
          // back off temporarily -- don't keep spamming maxLatencyCob_
          // if we're only a bit over the limit
          maxLatencyLoopTime_.dampen(0.9);
        } else if (!dampenMaxLatency_ && busy > maxLatency_) {
          // If no damping, we compare the raw busy time
          maxLatencyCob_();
        }
      }

      // Our loop run did real work; reset the idle timer
      idleStart = now;
    } else {
      VLOG(11) << "EventBase " << this << " did not timeout";
    }

    // Event loop indicated that there were no more events (NotificationQueue
    // was registered as an internal event and there were no other registered
    // events).
    if (res != 0) {
      // Since Notification Queue is marked 'internal' some events may not have
      // run.  Run them manually if so, and continue looping.
      //
      if (!queue_->empty()) {
        ExecutionObserverScopeGuard guard(executionObserver_, queue_.get());
        queue_->execute();
      } else if (!ranLoopCallbacks) {
        // If there were no more events and we also didn't have any loop
        // callbacks to run, there is nothing left to do.
        break;
      }
    }

    if (enableTimeMeasurement_) {
      VLOG(11) << "EventBase " << this
               << " loop time: " << getTimeDelta(&prev).count();
    }

    if (once) {
      break;
    }
  }
  // Reset stop_ so loop() can be called again
  stop_.store(false, std::memory_order_relaxed);

  if (res < 0) {
    LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
    return false;
  } else if (res == 1) {
    VLOG(5) << "EventBase: ran out of events (exiting loop)!";
  } else if (res > 1) {
    LOG(ERROR) << "EventBase: unknown event loop result = " << res;
    return false;
  }

  loopThread_.store({}, std::memory_order_release);

  VLOG(5) << "EventBase(): Done with loop.";
  return true;
}