in log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java [650:787]
private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
boolean errors = false;
OperationStatus status;
Cursor cursor = null;
try {
final BatchEvent batch = new BatchEvent();
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
try {
cursor = database.openCursor(null, CursorConfig.DEFAULT);
status = cursor.getFirst(key, data, null);
for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
final SimpleEvent event = createEvent(data);
if (event != null) {
batch.addEvent(event);
}
status = cursor.getNext(key, data, null);
}
break;
} catch (final LockConflictException lce) {
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
}
}
try {
manager.send(batch);
} catch (final Exception ioe) {
LOGGER.error("Error sending events", ioe);
errors = true;
}
if (!errors) {
if (cursor != null) {
cursor.close();
cursor = null;
}
Transaction txn = null;
Exception exception = null;
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
try {
txn = environment.beginTransaction(null, null);
try {
for (final Event event : batch.getEvents()) {
try {
final Map<String, String> headers = event.getHeaders();
key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
database.delete(txn, key);
} catch (final Exception ex) {
LOGGER.error("Error deleting key from database", ex);
}
}
txn.commit();
long count = dbCounter.get();
while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
count = dbCounter.get();
}
exception = null;
break;
} catch (final LockConflictException lce) {
exception = lce;
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
}
}
} catch (final Exception ex) {
LOGGER.error("Unable to commit transaction", ex);
if (txn != null) {
txn.abort();
}
}
} catch (final LockConflictException lce) {
exception = lce;
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
}
}
} finally {
if (cursor != null) {
cursor.close();
cursor = null;
}
if (txn != null) {
txn.abort();
txn = null;
}
}
try {
Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
} catch (final InterruptedException ie) {
// Ignore the error
}
}
if (exception != null) {
LOGGER.error("Unable to delete events from data base", exception);
}
}
} catch (final Exception ex) {
LOGGER.error("Error reading database", ex);
shutdown = true;
throw ex;
} finally {
if (cursor != null) {
cursor.close();
}
}
return errors;
}