in src/main/cpp/asyncappender.cpp [525:606]
void AsyncAppender::dispatch()
{
size_t discardCount = 0;
std::vector<size_t> pendingCountHistogram(priv->bufferSize, 0);
bool isActive = true;
while (isActive)
{
Pool p;
LoggingEventList events;
events.reserve(priv->bufferSize);
for (int count = 0; count < 2 && priv->dispatchedCount == priv->commitCount; ++count)
std::this_thread::yield(); // Wait a bit
if (priv->dispatchedCount == priv->commitCount)
{
std::unique_lock<std::mutex> lock(priv->bufferMutex);
priv->bufferNotEmpty.wait(lock, [this]() -> bool
{ return 0 < priv->blockedCount || priv->dispatchedCount != priv->commitCount || priv->closed; }
);
}
isActive = !priv->isClosed();
while (events.size() < priv->bufferSize && priv->dispatchedCount != priv->commitCount)
{
auto index = priv->dispatchedCount % priv->buffer.size();
const auto& data = priv->buffer[index];
events.push_back(data.event);
if (data.pendingCount < pendingCountHistogram.size())
++pendingCountHistogram[data.pendingCount];
++priv->dispatchedCount;
}
priv->bufferNotFull.notify_all();
{
std::lock_guard<std::mutex> lock(priv->bufferMutex);
for (auto discardItem : priv->discardMap)
{
events.push_back(discardItem.second.createEvent(p));
discardCount += discardItem.second.getCount();
}
priv->discardMap.clear();
}
for (auto item : events)
{
try
{
priv->appenders.appendLoopOnAppenders(item, p);
}
catch (std::exception& ex)
{
if (!priv->isClosed())
{
priv->errorHandler->error(LOG4CXX_STR("async dispatcher"), ex, 0, item);
isActive = false;
}
}
catch (...)
{
if (!priv->isClosed())
{
priv->errorHandler->error(LOG4CXX_STR("async dispatcher"));
isActive = false;
}
}
}
}
if (LogLog::isDebugEnabled())
{
Pool p;
LogString msg(LOG4CXX_STR("AsyncAppender"));
msg += LOG4CXX_STR(" discardCount ");
StringHelper::toString(discardCount, p, msg);
msg += LOG4CXX_STR(" pendingCountHistogram");
for (auto item : pendingCountHistogram)
{
msg += logchar(' ');
StringHelper::toString(item, p, msg);
}
LogLog::debug(msg);
}
}