private boolean sendBatch()

in log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java [650:787]


        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
            boolean errors = false;
            OperationStatus status;
            Cursor cursor = null;
            try {
                final BatchEvent batch = new BatchEvent();
                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
                    try {
                        cursor = database.openCursor(null, CursorConfig.DEFAULT);
                        status = cursor.getFirst(key, data, null);

                        for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
                            final SimpleEvent event = createEvent(data);
                            if (event != null) {
                                batch.addEvent(event);
                            }
                            status = cursor.getNext(key, data, null);
                        }
                        break;
                    } catch (final LockConflictException lce) {
                        if (cursor != null) {
                            try {
                                cursor.close();
                                cursor = null;
                            } catch (final Exception ex) {
                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
                            }
                        }
                    }
                }

                try {
                    manager.send(batch);
                } catch (final Exception ioe) {
                    LOGGER.error("Error sending events", ioe);
                    errors = true;
                }
                if (!errors) {
                    if (cursor != null) {
                        cursor.close();
                        cursor = null;
                    }
                    Transaction txn = null;
                    Exception exception = null;
                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
                        try {
                            txn = environment.beginTransaction(null, null);
                            try {
                                for (final Event event : batch.getEvents()) {
                                    try {
                                        final Map<String, String> headers = event.getHeaders();
                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
                                        database.delete(txn, key);
                                    } catch (final Exception ex) {
                                        LOGGER.error("Error deleting key from database", ex);
                                    }
                                }
                                txn.commit();
                                long count = dbCounter.get();
                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
                                    count = dbCounter.get();
                                }
                                exception = null;
                                break;
                            } catch (final LockConflictException lce) {
                                exception = lce;
                                if (cursor != null) {
                                    try {
                                        cursor.close();
                                        cursor = null;
                                    } catch (final Exception ex) {
                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
                                    }
                                }
                                if (txn != null) {
                                    try {
                                        txn.abort();
                                        txn = null;
                                    } catch (final Exception ex) {
                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
                                    }
                                }
                            } catch (final Exception ex) {
                                LOGGER.error("Unable to commit transaction", ex);
                                if (txn != null) {
                                    txn.abort();
                                }
                            }
                        } catch (final LockConflictException lce) {
                            exception = lce;
                            if (cursor != null) {
                                try {
                                    cursor.close();
                                    cursor = null;
                                } catch (final Exception ex) {
                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
                                }
                            }
                            if (txn != null) {
                                try {
                                    txn.abort();
                                    txn = null;
                                } catch (final Exception ex) {
                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
                                }
                            }
                        } finally {
                            if (cursor != null) {
                                cursor.close();
                                cursor = null;
                            }
                            if (txn != null) {
                                txn.abort();
                                txn = null;
                            }
                        }
                        try {
                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
                        } catch (final InterruptedException ie) {
                            // Ignore the error
                        }
                    }
                    if (exception != null) {
                        LOGGER.error("Unable to delete events from data base", exception);
                    }
                }
            } catch (final Exception ex) {
                LOGGER.error("Error reading database", ex);
                shutdown = true;
                throw ex;
            } finally {
                if (cursor != null) {
                    cursor.close();
                }
            }

            return errors;
        }