in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java [956:1232]
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
ThreadRegistry.register(journalThreadName);
if (conf.isBusyWaitEnabled()) {
try {
CpuAffinity.acquireCore();
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal thread: {}", e.getMessage(), e);
}
}
RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
int numEntriesToFlush = 0;
ByteBuf lenBuff = Unpooled.buffer(4);
ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
paddingBuff.writeZero(paddingBuff.capacity());
BufferedChannel bc = null;
JournalChannel logFile = null;
forceWriteThread.start();
Stopwatch journalCreationWatcher = Stopwatch.createUnstarted();
Stopwatch journalFlushWatcher = Stopwatch.createUnstarted();
long batchSize = 0;
try {
List<Long> journalIds = listJournalIds(journalDirectory, null);
// Should not use MathUtils.now(), which use System.nanoTime() and
// could only be used to measure elapsed time.
// http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
long lastFlushPosition = 0;
boolean groupWhenTimeout = false;
long dequeueStartTime = 0L;
long lastFlushTimeMs = System.currentTimeMillis();
final ObjectHashSet<BookieRequestHandler> writeHandlers = new ObjectHashSet<>();
QueueEntry[] localQueueEntries = new QueueEntry[conf.getJournalQueueSize()];
int localQueueEntriesIdx = 0;
int localQueueEntriesLen = 0;
QueueEntry qe = null;
while (true) {
// new journal file to write
if (null == logFile) {
logId = logId + 1;
journalIds = listJournalIds(journalDirectory, null);
Long replaceLogId = fileChannelProvider.supportReuseFile() && journalReuseFiles
&& journalIds.size() >= maxBackupJournals
&& journalIds.get(0) < lastLogMark.getCurMark().getLogFileId()
? journalIds.get(0) : null;
journalCreationWatcher.reset().start();
logFile = newLogFile(logId, replaceLogId);
journalStats.getJournalCreationStats().registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
bc = logFile.getBufferedChannel();
lastFlushPosition = bc.position();
}
if (qe == null) {
if (dequeueStartTime != 0) {
journalStats.getJournalProcessTimeStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
}
// At this point the local queue will always be empty, otherwise we would have
// advanced to the next `qe` at the end of the loop
localQueueEntriesIdx = 0;
if (numEntriesToFlush == 0) {
// There are no entries pending. We can wait indefinitely until the next
// one is available
localQueueEntriesLen = queue.takeAll(localQueueEntries);
} else {
// There are already some entries pending. We must adjust
// the waiting time to the remaining groupWait time
long pollWaitTimeNanos = maxGroupWaitInNanos
- MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
pollWaitTimeNanos = 0;
}
localQueueEntriesLen = queue.pollAll(localQueueEntries,
pollWaitTimeNanos, TimeUnit.NANOSECONDS);
}
dequeueStartTime = MathUtils.nowInNano();
if (localQueueEntriesLen > 0) {
qe = localQueueEntries[localQueueEntriesIdx];
localQueueEntries[localQueueEntriesIdx++] = null;
}
}
if (numEntriesToFlush > 0) {
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three conditions below holds good
// 1. If the oldest pending entry has been pending for longer than the max wait time
if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils
.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
groupWhenTimeout = true;
} else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& (qe == null // no entry to group
|| MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) {
// when group timeout, it would be better to look forward, as there might be lots of
// entries already timeout
// due to a previous slow write (writing to filesystem which impacted by force write).
// Group those entries in the queue
// a) already timeout
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxWaitCounter().inc();
} else if (qe != null
&& ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
|| (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxOutstandingBytesCounter().inc();
} else if (qe == null && flushWhenQueueEmpty) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushEmptyQueueCounter().inc();
}
// toFlush is non null and not empty so should be safe to access getFirst
if (shouldFlush) {
if (journalFormatVersionToWrite >= JournalChannel.V5) {
writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
}
journalFlushWatcher.reset().start();
bc.flush();
for (int i = 0; i < toFlush.size(); i++) {
QueueEntry entry = toFlush.get(i);
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
numEntriesToFlush--;
if (entry.getCtx() instanceof BookieRequestHandler
&& entry.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
writeHandlers.add((BookieRequestHandler) entry.getCtx());
}
entry.run();
}
}
writeHandlers.forEach(
(ObjectProcedure<? super BookieRequestHandler>)
BookieRequestHandler::flushPendingResponse);
writeHandlers.clear();
lastFlushPosition = bc.position();
journalStats.getJournalFlushStats().registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
if (e != null && LOG.isDebugEnabled()) {
LOG.debug("Written and queuing for flush Ledger: {} Entry: {}",
e.ledgerId, e.entryId);
}
}
}
journalStats.getForceWriteBatchEntriesStats()
.registerSuccessfulValue(numEntriesToFlush);
journalStats.getForceWriteBatchBytesStats()
.registerSuccessfulValue(batchSize);
boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
// Trigger data sync to disk in the "Force-Write" thread.
// Trigger data sync to disk has three situations:
// 1. journalSyncData enabled, usually for SSD used as journal storage
// 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize
// 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use
// journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk
// synchronize frequently, which will increase disk io util.
// when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s),
// it will trigger data sync to disk
if (syncData
|| shouldRolloverJournal
|| (System.currentTimeMillis() - lastFlushTimeMs
>= journalPageCacheFlushIntervalMSec)) {
forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
toFlush, shouldRolloverJournal));
lastFlushTimeMs = System.currentTimeMillis();
}
toFlush = entryListRecycler.newInstance();
numEntriesToFlush = 0;
batchSize = 0L;
// check whether journal file is over file limit
if (shouldRolloverJournal) {
// if the journal file is rolled over, the journal file will be closed after last
// entry is force written to disk.
logFile = null;
continue;
}
}
}
if (!running) {
LOG.info("Journal Manager is asked to shut down, quit.");
break;
}
if (qe == null) { // no more queue entry
continue;
}
journalStats.getJournalQueueSize().dec();
journalStats.getJournalQueueStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
if ((qe.entryId == BookieImpl.METAENTRY_ID_LEDGER_EXPLICITLAC)
&& (journalFormatVersionToWrite < JournalChannel.V6)) {
/*
* this means we are using new code which supports
* persisting explicitLac, but "journalFormatVersionToWrite"
* is set to some older value (< V6). In this case we
* shouldn't write this special entry
* (METAENTRY_ID_LEDGER_EXPLICITLAC) to Journal.
*/
memoryLimitController.releaseMemory(qe.entry.readableBytes());
ReferenceCountUtil.release(qe.entry);
} else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
int entrySize = qe.entry.readableBytes();
journalStats.getJournalWriteBytes().addCount(entrySize);
batchSize += (4 + entrySize);
lenBuff.clear();
lenBuff.writeInt(entrySize);
// preAlloc based on size
logFile.preAllocIfNeeded(4 + entrySize);
bc.write(lenBuff);
bc.write(qe.entry);
memoryLimitController.releaseMemory(qe.entry.readableBytes());
ReferenceCountUtil.release(qe.entry);
}
toFlush.add(qe);
numEntriesToFlush++;
if (localQueueEntriesIdx < localQueueEntriesLen) {
qe = localQueueEntries[localQueueEntriesIdx];
localQueueEntries[localQueueEntriesIdx++] = null;
} else {
qe = null;
}
}
} catch (IOException ioe) {
LOG.error("I/O exception in Journal thread!", ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.info("Journal exits when shutting down");
} finally {
// There could be packets queued for forceWrite on this logFile
// That is fine as this exception is going to anyway take down
// the bookie. If we execute this as a part of graceful shutdown,
// close will flush the file system cache making any previous
// cached writes durable so this is fine as well.
IOUtils.close(LOG, bc);
if (journalAliveListener != null) {
journalAliveListener.onJournalExit();
}
}
LOG.info("Journal exited loop!");
}