in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java [251:361]
public synchronized void start() throws IOException {
if (started) {
return;
}
long start = System.currentTimeMillis();
accessorPool = new DataFileAccessorPool(this);
started = true;
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
File[] files = directory.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String n) {
return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
}
});
if (files != null) {
for (File file : files) {
try {
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num);
fileMap.put(dataFile.getDataFileId(), dataFile);
totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
// Ignore file that do not match the pattern.
}
}
// Sort the list so that we can link the DataFiles together in the
// right order.
LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
Collections.sort(l);
for (DataFile df : l) {
if (df.getLength() == 0) {
// possibly the result of a previous failed write
LOG.info("ignoring zero length, partially initialised journal data file: " + df);
continue;
} else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
continue;
}
dataFiles.addLast(df);
fileByFileMap.put(df.getFile(), df);
if( isCheckForCorruptionOnStartup() ) {
lastAppendLocation.set(recoveryCheck(df));
}
}
}
if (preallocationScope != PreallocationScope.NONE) {
switch (preallocationStrategy) {
case SPARSE_FILE:
break;
case OS_KERNEL_COPY: {
osKernelCopyTemplateFile = createJournalTemplateFile();
}
break;
case CHUNKED_ZEROS: {
preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
}
break;
case ZEROS: {
preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
}
break;
}
}
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread schedulerThread = new Thread(r);
schedulerThread.setName("ActiveMQ Journal Scheduled executor");
schedulerThread.setDaemon(true);
return schedulerThread;
}
});
// init current write file
if (dataFiles.isEmpty()) {
nextDataFileId = 1;
rotateWriteFile();
} else {
currentDataFile.set(dataFiles.getTail());
nextDataFileId = currentDataFile.get().dataFileId + 1;
}
if( lastAppendLocation.get()==null ) {
DataFile df = dataFiles.getTail();
lastAppendLocation.set(recoveryCheck(df));
}
// ensure we don't report unused space of last journal file in size metric
int lastFileLength = dataFiles.getTail().getLength();
if (totalLength.get() > lastFileLength && lastAppendLocation.get().getOffset() > 0) {
totalLength.addAndGet(lastAppendLocation.get().getOffset() - lastFileLength);
}
cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanup();
}
}, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
long end = System.currentTimeMillis();
LOG.trace("Startup took: "+(end-start)+" ms");
}