public void run()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java [956:1232]


    public void run() {
        LOG.info("Starting journal on {}", journalDirectory);
        ThreadRegistry.register(journalThreadName);

        if (conf.isBusyWaitEnabled()) {
            try {
                CpuAffinity.acquireCore();
            } catch (Exception e) {
                LOG.warn("Unable to acquire CPU core for Journal thread: {}", e.getMessage(), e);
            }
        }

        RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
        int numEntriesToFlush = 0;
        ByteBuf lenBuff = Unpooled.buffer(4);
        ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
        paddingBuff.writeZero(paddingBuff.capacity());

        BufferedChannel bc = null;
        JournalChannel logFile = null;
        forceWriteThread.start();
        Stopwatch journalCreationWatcher = Stopwatch.createUnstarted();
        Stopwatch journalFlushWatcher = Stopwatch.createUnstarted();
        long batchSize = 0;
        try {
            List<Long> journalIds = listJournalIds(journalDirectory, null);
            // Should not use MathUtils.now(), which use System.nanoTime() and
            // could only be used to measure elapsed time.
            // http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
            long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
            long lastFlushPosition = 0;
            boolean groupWhenTimeout = false;

            long dequeueStartTime = 0L;
            long lastFlushTimeMs = System.currentTimeMillis();

            final ObjectHashSet<BookieRequestHandler> writeHandlers = new ObjectHashSet<>();
            QueueEntry[] localQueueEntries = new QueueEntry[conf.getJournalQueueSize()];
            int localQueueEntriesIdx = 0;
            int localQueueEntriesLen = 0;
            QueueEntry qe = null;
            while (true) {
                // new journal file to write
                if (null == logFile) {
                    logId = logId + 1;
                    journalIds = listJournalIds(journalDirectory, null);
                    Long replaceLogId = fileChannelProvider.supportReuseFile() && journalReuseFiles
                        && journalIds.size() >= maxBackupJournals
                        && journalIds.get(0) < lastLogMark.getCurMark().getLogFileId()
                        ? journalIds.get(0) : null;

                    journalCreationWatcher.reset().start();
                    logFile = newLogFile(logId, replaceLogId);

                    journalStats.getJournalCreationStats().registerSuccessfulEvent(
                            journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);

                    bc = logFile.getBufferedChannel();

                    lastFlushPosition = bc.position();
                }

                if (qe == null) {
                    if (dequeueStartTime != 0) {
                        journalStats.getJournalProcessTimeStats()
                            .registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
                    }

                    // At this point the local queue will always be empty, otherwise we would have
                    // advanced to the next `qe` at the end of the loop
                    localQueueEntriesIdx = 0;
                    if (numEntriesToFlush == 0) {
                        // There are no entries pending. We can wait indefinitely until the next
                        // one is available
                        localQueueEntriesLen = queue.takeAll(localQueueEntries);
                    } else {
                        // There are already some entries pending. We must adjust
                        // the waiting time to the remaining groupWait time
                        long pollWaitTimeNanos = maxGroupWaitInNanos
                                - MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
                        if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
                            pollWaitTimeNanos = 0;
                        }

                        localQueueEntriesLen = queue.pollAll(localQueueEntries,
                                pollWaitTimeNanos, TimeUnit.NANOSECONDS);
                    }

                    dequeueStartTime = MathUtils.nowInNano();

                    if (localQueueEntriesLen > 0) {
                        qe = localQueueEntries[localQueueEntriesIdx];
                        localQueueEntries[localQueueEntriesIdx++] = null;
                    }
                }

                if (numEntriesToFlush > 0) {
                    boolean shouldFlush = false;
                    // We should issue a forceWrite if any of the three conditions below holds good
                    // 1. If the oldest pending entry has been pending for longer than the max wait time
                    if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils
                            .elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
                        groupWhenTimeout = true;
                    } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
                        && (qe == null // no entry to group
                            || MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) {
                        // when group timeout, it would be better to look forward, as there might be lots of
                        // entries already timeout
                        // due to a previous slow write (writing to filesystem which impacted by force write).
                        // Group those entries in the queue
                        // a) already timeout
                        // b) limit the number of entries to group
                        groupWhenTimeout = false;
                        shouldFlush = true;
                        journalStats.getFlushMaxWaitCounter().inc();
                    } else if (qe != null
                            && ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
                            || (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
                        // 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
                        groupWhenTimeout = false;
                        shouldFlush = true;
                        journalStats.getFlushMaxOutstandingBytesCounter().inc();
                    } else if (qe == null && flushWhenQueueEmpty) {
                        // We should get here only if we flushWhenQueueEmpty is true else we would wait
                        // for timeout that would put is past the maxWait threshold
                        // 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
                        // publish at a time - common case in tests.
                        groupWhenTimeout = false;
                        shouldFlush = true;
                        journalStats.getFlushEmptyQueueCounter().inc();
                    }

                    // toFlush is non null and not empty so should be safe to access getFirst
                    if (shouldFlush) {
                        if (journalFormatVersionToWrite >= JournalChannel.V5) {
                            writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
                        }
                        journalFlushWatcher.reset().start();
                        bc.flush();

                        for (int i = 0; i < toFlush.size(); i++) {
                            QueueEntry entry = toFlush.get(i);
                            if (entry != null && (!syncData || entry.ackBeforeSync)) {
                                toFlush.set(i, null);
                                numEntriesToFlush--;
                                if (entry.getCtx() instanceof BookieRequestHandler
                                        && entry.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
                                    writeHandlers.add((BookieRequestHandler) entry.getCtx());
                                }
                                entry.run();
                            }
                        }
                        writeHandlers.forEach(
                                (ObjectProcedure<? super BookieRequestHandler>)
                                        BookieRequestHandler::flushPendingResponse);
                        writeHandlers.clear();

                        lastFlushPosition = bc.position();
                        journalStats.getJournalFlushStats().registerSuccessfulEvent(
                                journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);

                        // Trace the lifetime of entries through persistence
                        if (LOG.isDebugEnabled()) {
                            for (QueueEntry e : toFlush) {
                                if (e != null && LOG.isDebugEnabled()) {
                                    LOG.debug("Written and queuing for flush Ledger: {}  Entry: {}",
                                              e.ledgerId, e.entryId);
                                }
                            }
                        }

                        journalStats.getForceWriteBatchEntriesStats()
                            .registerSuccessfulValue(numEntriesToFlush);
                        journalStats.getForceWriteBatchBytesStats()
                            .registerSuccessfulValue(batchSize);
                        boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
                        // Trigger data sync to disk in the "Force-Write" thread.
                        // Trigger data sync to disk has three situations:
                        // 1. journalSyncData enabled, usually for SSD used as journal storage
                        // 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize
                        // 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use
                        //   journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk
                        //   synchronize frequently, which will increase disk io util.
                        //   when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s),
                        //   it will trigger data sync to disk
                        if (syncData
                                || shouldRolloverJournal
                                || (System.currentTimeMillis() - lastFlushTimeMs
                                >= journalPageCacheFlushIntervalMSec)) {
                            forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
                                    toFlush, shouldRolloverJournal));
                            lastFlushTimeMs = System.currentTimeMillis();
                        }
                        toFlush = entryListRecycler.newInstance();
                        numEntriesToFlush = 0;

                        batchSize = 0L;
                        // check whether journal file is over file limit
                        if (shouldRolloverJournal) {
                            // if the journal file is rolled over, the journal file will be closed after last
                            // entry is force written to disk.
                            logFile = null;
                            continue;
                        }
                    }
                }

                if (!running) {
                    LOG.info("Journal Manager is asked to shut down, quit.");
                    break;
                }

                if (qe == null) { // no more queue entry
                    continue;
                }

                journalStats.getJournalQueueSize().dec();
                journalStats.getJournalQueueStats()
                        .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);

                if ((qe.entryId == BookieImpl.METAENTRY_ID_LEDGER_EXPLICITLAC)
                        && (journalFormatVersionToWrite < JournalChannel.V6)) {
                    /*
                     * this means we are using new code which supports
                     * persisting explicitLac, but "journalFormatVersionToWrite"
                     * is set to some older value (< V6). In this case we
                     * shouldn't write this special entry
                     * (METAENTRY_ID_LEDGER_EXPLICITLAC) to Journal.
                     */
                    memoryLimitController.releaseMemory(qe.entry.readableBytes());
                    ReferenceCountUtil.release(qe.entry);
                } else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
                    int entrySize = qe.entry.readableBytes();
                    journalStats.getJournalWriteBytes().addCount(entrySize);

                    batchSize += (4 + entrySize);

                    lenBuff.clear();
                    lenBuff.writeInt(entrySize);

                    // preAlloc based on size
                    logFile.preAllocIfNeeded(4 + entrySize);

                    bc.write(lenBuff);
                    bc.write(qe.entry);
                    memoryLimitController.releaseMemory(qe.entry.readableBytes());
                    ReferenceCountUtil.release(qe.entry);
                }

                toFlush.add(qe);
                numEntriesToFlush++;

                if (localQueueEntriesIdx < localQueueEntriesLen) {
                    qe = localQueueEntries[localQueueEntriesIdx];
                    localQueueEntries[localQueueEntriesIdx++] = null;
                } else {
                    qe = null;
                }
            }
        } catch (IOException ioe) {
            LOG.error("I/O exception in Journal thread!", ioe);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.info("Journal exits when shutting down");
        } finally {
            // There could be packets queued for forceWrite on this logFile
            // That is fine as this exception is going to anyway take down
            // the bookie. If we execute this as a part of graceful shutdown,
            // close will flush the file system cache making any previous
            // cached writes durable so this is fine as well.
            IOUtils.close(LOG, bc);
            if (journalAliveListener != null) {
                journalAliveListener.onJournalExit();
            }
        }
        LOG.info("Journal exited loop!");
    }