in src/main/cpp/asyncappender.cpp [283:370]
void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p)
{
if (priv->bufferSize <= 0)
{
priv->appenders.appendLoopOnAppenders(event, p);
}
// Get a copy of this thread's diagnostic context
event->LoadDC();
if (!priv->dispatcher.joinable())
{
std::lock_guard<std::recursive_mutex> lock(priv->mutex);
if (!priv->dispatcher.joinable())
priv->dispatcher = ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), &AsyncAppender::dispatch, this );
}
while (true)
{
auto pendingCount = priv->eventCount - priv->dispatchedCount;
if (0 <= pendingCount && pendingCount < priv->bufferSize)
{
// Claim a slot in the ring buffer
auto oldEventCount = priv->eventCount++;
auto index = oldEventCount % priv->buffer.size();
// Wait for a free slot
while (priv->bufferSize <= oldEventCount - priv->dispatchedCount)
std::this_thread::yield(); // Allow the dispatch thread to free a slot
// Write to the ring buffer
priv->buffer[index] = AsyncAppenderPriv::EventData{event, pendingCount};
// Notify the dispatch thread that an event has been added
auto failureCount = 0;
auto savedEventCount = oldEventCount;
while (!priv->commitCount.compare_exchange_weak(oldEventCount, oldEventCount + 1, std::memory_order_release))
{
oldEventCount = savedEventCount;
if (2 < ++failureCount) // Did the scheduler suspend a thread between claiming a slot and advancing commitCount?
std::this_thread::yield(); // Wait a bit
}
priv->bufferNotEmpty.notify_all();
break;
}
//
// Following code is only reachable if buffer is full or eventCount has overflowed
//
std::unique_lock<std::mutex> lock(priv->bufferMutex);
priv->bufferNotEmpty.notify_all();
//
// if blocking and thread is not already interrupted
// and not the dispatcher then
// wait for a buffer notification
bool discard = true;
if (priv->blocking
&& !priv->closed
&& (priv->dispatcher.get_id() != std::this_thread::get_id()) )
{
++priv->blockedCount;
priv->bufferNotFull.wait(lock, [this]()
{
return priv->eventCount - priv->dispatchedCount < priv->bufferSize;
});
--priv->blockedCount;
discard = false;
}
//
// if blocking is false or thread has been interrupted
// add event to discard map.
//
if (discard)
{
LogString loggerName = event->getLoggerName();
DiscardMap::iterator iter = priv->discardMap.find(loggerName);
if (iter == priv->discardMap.end())
{
DiscardSummary summary(event);
priv->discardMap.insert(DiscardMap::value_type(loggerName, summary));
}
else
{
(*iter).second.add(event);
}
break;
}
}
}