public void run()

in log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java [509:648]


        public void run() {
            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
            while (!shutdown) {
                final long nowMillis = System.currentTimeMillis();
                final long dbCount = database.count();
                dbCounter.set(dbCount);
                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
                    nextBatchMillis = nowMillis + manager.getDelayMillis();
                    try {
                        boolean errors = false;
                        final DatabaseEntry key = new DatabaseEntry();
                        final DatabaseEntry data = new DatabaseEntry();

                        gate.close();
                        OperationStatus status;
                        if (batchSize > 1) {
                            try {
                                errors = sendBatch(key, data);
                            } catch (final Exception ex) {
                                break;
                            }
                        } else {
                            Exception exception = null;
                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
                                exception = null;
                                Transaction txn = null;
                                Cursor cursor = null;
                                try {
                                    txn = environment.beginTransaction(null, null);
                                    cursor = database.openCursor(txn, null);
                                    try {
                                        status = cursor.getFirst(key, data, LockMode.RMW);
                                        while (status == OperationStatus.SUCCESS) {
                                            final SimpleEvent event = createEvent(data);
                                            if (event != null) {
                                                try {
                                                    manager.doSend(event);
                                                } catch (final Exception ioe) {
                                                    errors = true;
                                                    LOGGER.error("Error sending event", ioe);
                                                    break;
                                                }
                                                try {
                                                    cursor.delete();
                                                } catch (final Exception ex) {
                                                    LOGGER.error("Unable to delete event", ex);
                                                }
                                            }
                                            status = cursor.getNext(key, data, LockMode.RMW);
                                        }
                                        if (cursor != null) {
                                            cursor.close();
                                            cursor = null;
                                        }
                                        txn.commit();
                                        txn = null;
                                        dbCounter.decrementAndGet();
                                        exception = null;
                                        break;
                                    } catch (final LockConflictException lce) {
                                        exception = lce;
                                        // Fall through and retry.
                                    } catch (final Exception ex) {
                                        LOGGER.error("Error reading or writing to database", ex);
                                        shutdown = true;
                                        break;
                                    } finally {
                                        if (cursor != null) {
                                            cursor.close();
                                            cursor = null;
                                        }
                                        if (txn != null) {
                                            txn.abort();
                                            txn = null;
                                        }
                                    }
                                } catch (final LockConflictException lce) {
                                    exception = lce;
                                    if (cursor != null) {
                                        try {
                                            cursor.close();
                                            cursor = null;
                                        } catch (final Exception ex) {
                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
                                        }
                                    }
                                    if (txn != null) {
                                        try {
                                            txn.abort();
                                            txn = null;
                                        } catch (final Exception ex) {
                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
                                        }
                                    }
                                }
                                try {
                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
                                } catch (final InterruptedException ie) {
                                    // Ignore the error
                                }
                            }
                            if (exception != null) {
                                LOGGER.error("Unable to read or update data base", exception);
                            }
                        }
                        if (errors) {
                            Thread.sleep(manager.getDelayMillis());
                            continue;
                        }
                    } catch (final Exception ex) {
                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
                    }
                } else {
                    if (nextBatchMillis <= nowMillis) {
                        nextBatchMillis = nowMillis + manager.getDelayMillis();
                    }
                    try {
                        final long interval = nextBatchMillis - nowMillis;
                        gate.waitForOpen(interval);
                    } catch (final InterruptedException ie) {
                        LOGGER.warn("WriterThread interrupted, continuing");
                    } catch (final Exception ex) {
                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
                        break;
                    }
                }
            }

            if (batchSize > 1 && database.count() > 0) {
                final DatabaseEntry key = new DatabaseEntry();
                final DatabaseEntry data = new DatabaseEntry();
                try {
                    sendBatch(key, data);
                } catch (final Exception ex) {
                    LOGGER.warn("Unable to write final batch");
                }
            }
            LOGGER.trace("WriterThread exiting");
        }