in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java [150:248]
private JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
long position, boolean fRemoveFromPageCache,
int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder,
ServerConfiguration conf,
FileChannelProvider provider, Long toReplaceLogId) throws IOException {
this.journalAlignSize = journalAlignSize;
this.zeros = ByteBuffer.allocate(journalAlignSize);
this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
this.fRemoveFromPageCache = fRemoveFromPageCache;
this.configuration = conf;
boolean reuseFile = false;
File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
if (toReplaceLogId != null && logId != toReplaceLogId && provider.supportReuseFile()) {
File toReplaceFile = new File(journalDirectory, Long.toHexString(toReplaceLogId) + ".txn");
if (toReplaceFile.exists()) {
renameJournalFile(toReplaceFile, fn);
provider.notifyRename(toReplaceFile, fn);
reuseFile = true;
}
}
channel = provider.open(fn, configuration);
if (formatVersionToWrite < V4) {
throw new IOException("Invalid journal format to write : version = " + formatVersionToWrite);
}
LOG.info("Opening journal {}", fn);
if (!channel.fileExists(fn)) { // create new journal file to write, write version
if (!fn.createNewFile()) {
LOG.error("Journal file {}, that shouldn't exist, already exists. "
+ " is there another bookie process running?", fn);
throw new IOException("File " + fn
+ " suddenly appeared, is another bookie process running?");
}
fc = channel.getFileChannel();
formatVersion = formatVersionToWrite;
writeHeader(bcBuilder, writeBufferSize);
} else if (reuseFile) { // Open an existing journal to write, it needs fileChannelProvider support reuse file.
fc = channel.getFileChannel();
formatVersion = formatVersionToWrite;
writeHeader(bcBuilder, writeBufferSize);
} else { // open an existing file to read.
fc = channel.getFileChannel();
// readonly, use fileChannel directly, no need to use BufferedChannel
ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE);
int c = fc.read(bb);
bb.flip();
if (c == VERSION_HEADER_SIZE) {
byte[] first4 = new byte[4];
bb.get(first4);
if (Arrays.equals(first4, magicWord)) {
formatVersion = bb.getInt();
} else {
// pre magic word journal, reset to 0;
formatVersion = V1;
}
} else {
// no header, must be old version
formatVersion = V1;
}
if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION
|| formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) {
String err = String.format("Invalid journal version, unable to read."
+ " Expected between (%d) and (%d), got (%d)",
MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION,
formatVersion);
LOG.error(err);
throw new IOException(err);
}
try {
if (position == START_OF_FILE) {
if (formatVersion >= V5) {
fc.position(HEADER_SIZE);
} else if (formatVersion >= V2) {
fc.position(VERSION_HEADER_SIZE);
} else {
fc.position(0);
}
} else {
fc.position(position);
}
} catch (IOException e) {
LOG.error("Bookie journal file can seek to position :", e);
throw e;
}
}
if (fRemoveFromPageCache) {
this.fd = PageCacheUtil.getSysFileDescriptor(channel.getFD());
} else {
this.fd = -1;
}
}