in artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java [590:900]
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
final JournalReaderCallback reader,
final AtomicReference<ByteBuffer> wholeFileBufferReference,
boolean reclaimed, ByteObjectHashMap<Boolean> replaceableRecords) throws Exception {
file.getFile().open(1, false);
ByteBuffer wholeFileBuffer = null;
try {
final int filesize = (int) file.getFile().size();
if (filesize < JournalImpl.SIZE_HEADER) {
// the file is damaged or the system crash before it was able to write
return -1;
}
wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);
final int journalFileSize = file.getFile().read(wholeFileBuffer);
if (journalFileSize != filesize) {
throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
}
// First long is the ordering timestamp, we just jump its position
wholeFileBuffer.position(JournalImpl.SIZE_HEADER);
int lastDataPos = JournalImpl.SIZE_HEADER;
while (wholeFileBuffer.hasRemaining()) {
final int pos = wholeFileBuffer.position();
byte recordType = wholeFileBuffer.get();
if (recordType < JournalImpl.EVENT_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) {
// I - We scan for any valid record on the file. If a hole
// happened on the middle of the file we keep looking until all
// the possibilities are gone
continue;
}
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
reader.markAsDataFile(file);
wholeFileBuffer.position(pos + 1);
// II - Ignore this record, let's keep looking
continue;
}
// III - Every record has the file-id.
// This is what supports us from not re-filling the whole file
int readFileId = wholeFileBuffer.getInt();
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
if (readFileId != file.getRecordID() && !reclaimed) {
wholeFileBuffer.position(pos + 1);
continue;
}
short compactCount = 0;
if (file.getJournalVersion() >= 2) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE)) {
reader.markAsDataFile(file);
wholeFileBuffer.position(pos + 1);
continue;
}
compactCount = wholeFileBuffer.get();
}
long transactionID = 0;
if (JournalImpl.isTransaction(recordType)) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG)) {
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
continue;
}
transactionID = wholeFileBuffer.getLong();
}
long recordID = 0;
// If prepare or commit
if (!JournalImpl.isCompleteTransaction(recordType)) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG)) {
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
continue;
}
recordID = wholeFileBuffer.getLong();
}
// We use the size of the record to validate the health of the
// record.
// (V) We verify the size of the record
// The variable record portion used on Updates and Appends
int variableSize = 0;
// Used to hold extra data on transaction prepares
int preparedTransactionExtraDataSize = 0;
byte userRecordType = 0;
byte[] record = null;
if (JournalImpl.isContainsBody(recordType)) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
continue;
}
variableSize = wholeFileBuffer.getInt();
if (recordType != JournalImpl.DELETE_RECORD_TX) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1)) {
wholeFileBuffer.position(pos + 1);
continue;
}
userRecordType = wholeFileBuffer.get();
}
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize)) {
wholeFileBuffer.position(pos + 1);
continue;
}
record = new byte[variableSize];
wholeFileBuffer.get(record);
}
// Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
// currentFile
int transactionCheckNumberOfRecords = 0;
if (recordType == JournalImpl.PREPARE_RECORD || recordType == JournalImpl.COMMIT_RECORD) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
wholeFileBuffer.position(pos + 1);
continue;
}
transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
if (recordType == JournalImpl.PREPARE_RECORD) {
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
wholeFileBuffer.position(pos + 1);
continue;
}
// Add the variable size required for preparedTransactions
preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
}
variableSize = 0;
}
int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());
// VI - this is completing V, We will validate the size at the end
// of the record,
// But we avoid buffer overflows by damaged data
if (JournalImpl.isInvalidSize(journalFileSize, pos, recordSize + variableSize +
preparedTransactionExtraDataSize)) {
// Avoid a buffer overflow caused by damaged data... continue
// scanning for more pendingTransactions...
if (logger.isTraceEnabled()) {
logger.trace("Record at position {} recordType = {} file:{} recordSize: {} variableSize: {} preparedTransactionExtraDataSize: {} is corrupted and it is being ignored (II)",
pos, recordType, file.getFile().getFileName(), recordSize, variableSize, preparedTransactionExtraDataSize);
}
// If a file has damaged pendingTransactions, we make it a dataFile, and the
// next reclaiming will fix it
reader.markAsDataFile(file);
wholeFileBuffer.position(pos + 1);
continue;
}
int oldPos = wholeFileBuffer.position();
wholeFileBuffer.position(pos + variableSize +
recordSize +
preparedTransactionExtraDataSize - DataConstants.SIZE_INT);
int checkSize = wholeFileBuffer.getInt();
// VII - The checkSize at the end has to match with the size
// informed at the beginning.
// This is like testing a hash for the record. (We could replace the
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize) {
if (logger.isTraceEnabled()) {
logger.trace("Record at position {} recordType = {} possible transactionID = {} possible recordID = {} file:{} is corrupted and it is being ignored (III)",
pos, recordType, transactionID, recordID, file.getFile().getFileName());
}
// If a file has damaged pendingTransactions, we make it a dataFile, and the
// next reclaiming will fix it
reader.markAsDataFile(file);
wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
continue;
}
wholeFileBuffer.position(oldPos);
// At this point everything is checked. So we relax and just load
// the data now.
if (logger.isTraceEnabled()) {
logger.trace("reading {}, userRecordType={}, compactCount={}", recordID, userRecordType, compactCount);
}
boolean replaceableUpdate = replaceableRecords != null ? replaceableRecords.containsKey(userRecordType) : false;
switch (recordType) {
case EVENT_RECORD: {
reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, replaceableUpdate, compactCount));
break;
}
case ADD_RECORD: {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, false, compactCount));
break;
}
case UPDATE_RECORD: {
reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount));
break;
}
case DELETE_RECORD: {
reader.onReadDeleteRecord(recordID);
break;
}
case ADD_RECORD_TX: {
reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, false, compactCount));
break;
}
case UPDATE_RECORD_TX: {
reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount));
break;
}
case DELETE_RECORD_TX: {
reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, false, compactCount));
break;
}
case PREPARE_RECORD: {
byte[] extraData = new byte[preparedTransactionExtraDataSize];
wholeFileBuffer.get(extraData);
reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
break;
}
case COMMIT_RECORD: {
reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
break;
}
case ROLLBACK_RECORD: {
reader.onReadRollbackRecord(transactionID);
break;
}
default: {
throw new IllegalStateException("Journal " + file.getFile().getFileName() +
" is corrupt, invalid record type " +
recordType);
}
}
checkSize = wholeFileBuffer.getInt();
// This is a sanity check about the loading code itself.
// If this checkSize doesn't match, it means the reading method is
// not doing what it was supposed to do
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize) {
throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
", pos = " +
pos);
}
lastDataPos = wholeFileBuffer.position();
}
reader.done();
return lastDataPos;
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
throw new Exception(e.getMessage(), e);
} finally {
if (wholeFileBufferReference == null && wholeFileBuffer != null) {
fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
try {
file.getFile().close(false, false);
} catch (Throwable ignored) {
}
}
}