in log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java [509:648]
public void run() {
LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
while (!shutdown) {
final long nowMillis = System.currentTimeMillis();
final long dbCount = database.count();
dbCounter.set(dbCount);
if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
nextBatchMillis = nowMillis + manager.getDelayMillis();
try {
boolean errors = false;
final DatabaseEntry key = new DatabaseEntry();
final DatabaseEntry data = new DatabaseEntry();
gate.close();
OperationStatus status;
if (batchSize > 1) {
try {
errors = sendBatch(key, data);
} catch (final Exception ex) {
break;
}
} else {
Exception exception = null;
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
exception = null;
Transaction txn = null;
Cursor cursor = null;
try {
txn = environment.beginTransaction(null, null);
cursor = database.openCursor(txn, null);
try {
status = cursor.getFirst(key, data, LockMode.RMW);
while (status == OperationStatus.SUCCESS) {
final SimpleEvent event = createEvent(data);
if (event != null) {
try {
manager.doSend(event);
} catch (final Exception ioe) {
errors = true;
LOGGER.error("Error sending event", ioe);
break;
}
try {
cursor.delete();
} catch (final Exception ex) {
LOGGER.error("Unable to delete event", ex);
}
}
status = cursor.getNext(key, data, LockMode.RMW);
}
if (cursor != null) {
cursor.close();
cursor = null;
}
txn.commit();
txn = null;
dbCounter.decrementAndGet();
exception = null;
break;
} catch (final LockConflictException lce) {
exception = lce;
// Fall through and retry.
} catch (final Exception ex) {
LOGGER.error("Error reading or writing to database", ex);
shutdown = true;
break;
} finally {
if (cursor != null) {
cursor.close();
cursor = null;
}
if (txn != null) {
txn.abort();
txn = null;
}
}
} catch (final LockConflictException lce) {
exception = lce;
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception aborting tx during lock conflict.");
}
}
}
try {
Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
} catch (final InterruptedException ie) {
// Ignore the error
}
}
if (exception != null) {
LOGGER.error("Unable to read or update data base", exception);
}
}
if (errors) {
Thread.sleep(manager.getDelayMillis());
continue;
}
} catch (final Exception ex) {
LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
}
} else {
if (nextBatchMillis <= nowMillis) {
nextBatchMillis = nowMillis + manager.getDelayMillis();
}
try {
final long interval = nextBatchMillis - nowMillis;
gate.waitForOpen(interval);
} catch (final InterruptedException ie) {
LOGGER.warn("WriterThread interrupted, continuing");
} catch (final Exception ex) {
LOGGER.error("WriterThread encountered an exception waiting for work", ex);
break;
}
}
}
if (batchSize > 1 && database.count() > 0) {
final DatabaseEntry key = new DatabaseEntry();
final DatabaseEntry data = new DatabaseEntry();
try {
sendBatch(key, data);
} catch (final Exception ex) {
LOGGER.warn("Unable to write final batch");
}
}
LOGGER.trace("WriterThread exiting");
}