in folly/io/async/EventBase.cpp [323:479]
bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
VLOG(5) << "EventBase(): Starting loop.";
const char* message =
"Your code just tried to loop over an event base from inside another "
"event base loop. Since libevent is not reentrant, this leads to "
"undefined behavior in opt builds. Please fix immediately. For the "
"common case of an inner function that needs to do some synchronous "
"computation on an event-base, replace getEventBase() by a new, "
"stack-allocated EventBase.";
LOG_IF(DFATAL, invokingLoop_) << message;
invokingLoop_ = true;
SCOPE_EXIT { invokingLoop_ = false; };
int res = 0;
bool ranLoopCallbacks;
bool blocking = !(flags & EVLOOP_NONBLOCK);
bool once = (flags & EVLOOP_ONCE);
// time-measurement variables.
std::chrono::steady_clock::time_point prev;
std::chrono::steady_clock::time_point idleStart = {};
std::chrono::microseconds busy;
std::chrono::microseconds idle;
auto const prevLoopThread = loopThread_.exchange(
std::this_thread::get_id(), std::memory_order_release);
CHECK_EQ(std::thread::id(), prevLoopThread)
<< "Driving an EventBase in one thread (" << std::this_thread::get_id()
<< ") while it is already being driven in another thread ("
<< prevLoopThread << ") is forbidden.";
if (!name_.empty()) {
setThreadName(name_);
}
if (enableTimeMeasurement_) {
prev = std::chrono::steady_clock::now();
idleStart = prev;
}
while (!stop_.load(std::memory_order_relaxed)) {
if (!ignoreKeepAlive) {
applyLoopKeepAlive();
}
++nextLoopCnt_;
// Run the before loop callbacks
LoopCallbackList callbacks;
callbacks.swap(runBeforeLoopCallbacks_);
runLoopCallbacks(callbacks);
// nobody can add loop callbacks from within this thread if
// we don't have to handle anything to start with...
if (blocking && loopCallbacks_.empty()) {
res = evb_->eb_event_base_loop(EVLOOP_ONCE);
} else {
res = evb_->eb_event_base_loop(EVLOOP_ONCE | EVLOOP_NONBLOCK);
}
ranLoopCallbacks = runLoopCallbacks();
if (enableTimeMeasurement_) {
auto now = std::chrono::steady_clock::now();
busy = std::chrono::duration_cast<std::chrono::microseconds>(
now - startWork_);
idle = std::chrono::duration_cast<std::chrono::microseconds>(
startWork_ - idleStart);
auto loop_time = busy + idle;
avgLoopTime_.addSample(loop_time, busy);
maxLatencyLoopTime_.addSample(loop_time, busy);
if (observer_) {
if (observerSampleCount_++ == observer_->getSampleRate()) {
observerSampleCount_ = 0;
observer_->loopSample(busy.count(), idle.count());
}
}
VLOG(11) << "EventBase " << this << " did not timeout "
<< " loop time guess: " << loop_time.count()
<< " idle time: " << idle.count()
<< " busy time: " << busy.count()
<< " avgLoopTime: " << avgLoopTime_.get()
<< " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
<< " maxLatency_: " << maxLatency_.count() << "us"
<< " notificationQueueSize: " << getNotificationQueueSize()
<< " nothingHandledYet(): " << nothingHandledYet();
if (maxLatency_ > std::chrono::microseconds::zero()) {
// see if our average loop time has exceeded our limit
if (dampenMaxLatency_ &&
(maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
maxLatencyCob_();
// back off temporarily -- don't keep spamming maxLatencyCob_
// if we're only a bit over the limit
maxLatencyLoopTime_.dampen(0.9);
} else if (!dampenMaxLatency_ && busy > maxLatency_) {
// If no damping, we compare the raw busy time
maxLatencyCob_();
}
}
// Our loop run did real work; reset the idle timer
idleStart = now;
} else {
VLOG(11) << "EventBase " << this << " did not timeout";
}
// Event loop indicated that there were no more events (NotificationQueue
// was registered as an internal event and there were no other registered
// events).
if (res != 0) {
// Since Notification Queue is marked 'internal' some events may not have
// run. Run them manually if so, and continue looping.
//
if (!queue_->empty()) {
ExecutionObserverScopeGuard guard(executionObserver_, queue_.get());
queue_->execute();
} else if (!ranLoopCallbacks) {
// If there were no more events and we also didn't have any loop
// callbacks to run, there is nothing left to do.
break;
}
}
if (enableTimeMeasurement_) {
VLOG(11) << "EventBase " << this
<< " loop time: " << getTimeDelta(&prev).count();
}
if (once) {
break;
}
}
// Reset stop_ so loop() can be called again
stop_.store(false, std::memory_order_relaxed);
if (res < 0) {
LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
return false;
} else if (res == 1) {
VLOG(5) << "EventBase: ran out of events (exiting loop)!";
} else if (res > 1) {
LOG(ERROR) << "EventBase: unknown event loop result = " << res;
return false;
}
loopThread_.store({}, std::memory_order_release);
VLOG(5) << "EventBase(): Done with loop.";
return true;
}