public static int readJournalFile()

in artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java [583:893]


   public static int readJournalFile(final SequentialFileFactory fileFactory,
                              final JournalFile file,
                              final JournalReaderCallback reader,
                              final AtomicReference<ByteBuffer> wholeFileBufferReference,
                              boolean reclaimed, ByteObjectHashMap<Boolean> replaceableRecords) throws Exception {
      file.getFile().open(1, false);
      ByteBuffer wholeFileBuffer = null;
      try {
         final int filesize = (int) file.getFile().size();

         if (filesize < JournalImpl.SIZE_HEADER) {
            // the file is damaged or the system crash before it was able to write
            return -1;
         }
         wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);

         final int journalFileSize = file.getFile().read(wholeFileBuffer);

         if (journalFileSize != filesize) {
            throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
         }

         // First long is the ordering timestamp, we just jump its position
         wholeFileBuffer.position(JournalImpl.SIZE_HEADER);

         int lastDataPos = JournalImpl.SIZE_HEADER;

         while (wholeFileBuffer.hasRemaining()) {
            final int pos = wholeFileBuffer.position();

            byte recordType = wholeFileBuffer.get();

            if (recordType < JournalImpl.EVENT_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) {
               // I - We scan for any valid record on the file. If a hole
               // happened on the middle of the file we keep looking until all
               // the possibilities are gone
               continue;
            }

            if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
               reader.markAsDataFile(file);

               wholeFileBuffer.position(pos + 1);
               // II - Ignore this record, let's keep looking
               continue;
            }

            // III - Every record has the file-id.
            // This is what supports us from not re-filling the whole file
            int readFileId = wholeFileBuffer.getInt();

            // This record is from a previous file-usage. The file was
            // reused and we need to ignore this record
            if (readFileId != file.getRecordID() && !reclaimed) {
               wholeFileBuffer.position(pos + 1);
               continue;
            }

            short compactCount = 0;

            if (file.getJournalVersion() >= 2) {
               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE)) {
                  reader.markAsDataFile(file);

                  wholeFileBuffer.position(pos + 1);
                  continue;
               }

               compactCount = wholeFileBuffer.get();
            }

            long transactionID = 0;

            if (JournalImpl.isTransaction(recordType)) {
               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG)) {
                  wholeFileBuffer.position(pos + 1);
                  reader.markAsDataFile(file);
                  continue;
               }

               transactionID = wholeFileBuffer.getLong();
            }

            long recordID = 0;

            // If prepare or commit
            if (!JournalImpl.isCompleteTransaction(recordType)) {
               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG)) {
                  wholeFileBuffer.position(pos + 1);
                  reader.markAsDataFile(file);
                  continue;
               }

               recordID = wholeFileBuffer.getLong();
            }

            // We use the size of the record to validate the health of the
            // record.
            // (V) We verify the size of the record

            // The variable record portion used on Updates and Appends
            int variableSize = 0;

            // Used to hold extra data on transaction prepares
            int preparedTransactionExtraDataSize = 0;

            byte userRecordType = 0;

            byte[] record = null;

            if (JournalImpl.isContainsBody(recordType)) {
               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
                  wholeFileBuffer.position(pos + 1);
                  reader.markAsDataFile(file);
                  continue;
               }

               variableSize = wholeFileBuffer.getInt();

               if (recordType != JournalImpl.DELETE_RECORD_TX) {
                  if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1)) {
                     wholeFileBuffer.position(pos + 1);
                     continue;
                  }

                  userRecordType = wholeFileBuffer.get();
               }

               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize)) {
                  wholeFileBuffer.position(pos + 1);
                  continue;
               }

               record = new byte[variableSize];

               wholeFileBuffer.get(record);
            }

            // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
            // currentFile
            int transactionCheckNumberOfRecords = 0;

            if (recordType == JournalImpl.PREPARE_RECORD || recordType == JournalImpl.COMMIT_RECORD) {
               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
                  wholeFileBuffer.position(pos + 1);
                  continue;
               }

               transactionCheckNumberOfRecords = wholeFileBuffer.getInt();

               if (recordType == JournalImpl.PREPARE_RECORD) {
                  if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
                     wholeFileBuffer.position(pos + 1);
                     continue;
                  }
                  // Add the variable size required for preparedTransactions
                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
               }
               variableSize = 0;
            }

            int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());

            // VI - this is completing V, We will validate the size at the end
            // of the record,
            // But we avoid buffer overflows by damaged data
            if (JournalImpl.isInvalidSize(journalFileSize, pos, recordSize + variableSize +
               preparedTransactionExtraDataSize)) {
               // Avoid a buffer overflow caused by damaged data... continue
               // scanning for more pendingTransactions...
               if (logger.isTraceEnabled()) {
                  logger.trace("Record at position {} recordType = {} file:{} recordSize: {} variableSize: {} preparedTransactionExtraDataSize: {} is corrupted and it is being ignored (II)",
                               pos, recordType, file.getFile().getFileName(), recordSize, variableSize, preparedTransactionExtraDataSize);
               }
               // If a file has damaged pendingTransactions, we make it a dataFile, and the
               // next reclaiming will fix it
               reader.markAsDataFile(file);
               wholeFileBuffer.position(pos + 1);

               continue;
            }

            int oldPos = wholeFileBuffer.position();

            wholeFileBuffer.position(pos + variableSize +
                                        recordSize +
                                        preparedTransactionExtraDataSize - DataConstants.SIZE_INT);

            int checkSize = wholeFileBuffer.getInt();

            // VII - The checkSize at the end has to match with the size
            // informed at the beginning.
            // This is like testing a hash for the record. (We could replace the
            // checkSize by some sort of calculated hash)
            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize) {
               if (logger.isTraceEnabled()) {
                  logger.trace("Record at position {} recordType = {} possible transactionID = {} possible recordID = {} file:{} is corrupted and it is being ignored (III)",
                               pos, recordType, transactionID, recordID, file.getFile().getFileName());
               }

               // If a file has damaged pendingTransactions, we make it a dataFile, and the
               // next reclaiming will fix it
               reader.markAsDataFile(file);

               wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);

               continue;
            }

            wholeFileBuffer.position(oldPos);

            // At this point everything is checked. So we relax and just load
            // the data now.

            if (logger.isTraceEnabled()) {
               logger.trace("reading {}, userRecordType={}, compactCount={}", recordID, userRecordType, compactCount);
            }

            boolean replaceableUpdate =  replaceableRecords != null ? replaceableRecords.containsKey(userRecordType) : false;

            switch (recordType) {
               case EVENT_RECORD: {
                  reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, replaceableUpdate, compactCount));
                  break;
               }

               case ADD_RECORD: {
                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, false, compactCount));
                  break;
               }

               case UPDATE_RECORD: {
                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount));
                  break;
               }

               case DELETE_RECORD: {
                  reader.onReadDeleteRecord(recordID);
                  break;
               }

               case ADD_RECORD_TX: {
                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, false, compactCount));
                  break;
               }

               case UPDATE_RECORD_TX: {
                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount));
                  break;
               }

               case DELETE_RECORD_TX: {
                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, false, compactCount));
                  break;
               }

               case PREPARE_RECORD: {

                  byte[] extraData = new byte[preparedTransactionExtraDataSize];

                  wholeFileBuffer.get(extraData);

                  reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);

                  break;
               }
               case COMMIT_RECORD: {

                  reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
                  break;
               }
               case ROLLBACK_RECORD: {
                  reader.onReadRollbackRecord(transactionID);
                  break;
               }
               default: {
                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
                                                     " is corrupt, invalid record type " +
                                                     recordType);
               }
            }

            checkSize = wholeFileBuffer.getInt();

            // This is a sanity check about the loading code itself.
            // If this checkSize doesn't match, it means the reading method is
            // not doing what it was supposed to do
            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize) {
               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
                                                  ", pos = " +
                                                  pos);
            }

            lastDataPos = wholeFileBuffer.position();

         }
         reader.done();
         return lastDataPos;
      } catch (Throwable e) {
         ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
         throw new Exception(e.getMessage(), e);
      } finally {
         if (wholeFileBufferReference == null && wholeFileBuffer != null) {
            fileFactory.releaseDirectBuffer(wholeFileBuffer);
         }
         try {
            file.getFile().close(false, false);
         } catch (Throwable ignored) {
         }
      }
   }