private synchronized JournalLoadInformation load()

in artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java [2018:2329]


   private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
                                                    final boolean changeData,
                                                    final JournalState replicationSync,
                                                    final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
      JournalState state;
      assert (state = this.state) != JournalState.STOPPED &&
         state != JournalState.LOADED &&
         state != replicationSync;

      checkControlFile(wholeFileBufferRef);

      records.clear();

      filesRepository.clear();

      transactions.clear();
      currentFile = null;

      final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<>();

      final List<JournalFile> orderedFiles = orderFiles();

      filesRepository.calculateNextfileID(orderedFiles);

      int lastDataPos = JournalImpl.SIZE_HEADER;

      // AtomicLong is used only as a reference, not as an Atomic value
      final AtomicLong maxID = new AtomicLong(-1);

      for (final JournalFile file : orderedFiles) {
         logger.trace("Loading file {}", file.getFile().getFileName());

         final AtomicBoolean hasData = new AtomicBoolean(false);

         int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {

            private void checkID(final long id) {
               if (id > maxID.longValue()) {
                  maxID.lazySet(id);
               }
            }

            @Override
            public void onReadAddRecord(final RecordInfo info) throws Exception {
               checkID(info.id);

               hasData.lazySet(true);

               loadManager.addRecord(info);

               records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
            }

            @Override
            public void onReadUpdateRecord(final RecordInfo info) throws Exception {
               checkID(info.id);

               hasData.lazySet(true);

               loadManager.updateRecord(info);

               JournalRecord posFiles = records.get(info.id);

               if (posFiles != null) {
                  // It's legal for this to be null. The file(s) with the may
                  // have been deleted
                  // just leaving some updates in this file

                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, info.replaceableUpdate); // +1 = compact
                  // count
               }
            }

            @Override
            public void onReadDeleteRecord(final long recordID) throws Exception {
               hasData.lazySet(true);

               loadManager.deleteRecord(recordID);

               JournalRecord posFiles = records.remove(recordID);

               if (posFiles != null) {
                  posFiles.delete(file);
               }
            }

            @Override
            public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
               onReadAddRecordTX(transactionID, info);
            }

            @Override
            public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {

               checkID(info.id);

               hasData.lazySet(true);

               TransactionHolder tx = loadTransactions.get(transactionID);

               if (tx == null) {
                  tx = new TransactionHolder(transactionID);

                  loadTransactions.put(transactionID, tx);
               }

               tx.recordInfos.add(info);

               JournalTransaction tnp = transactions.get(transactionID);

               if (tnp == null) {
                  tnp = new JournalTransaction(transactionID, JournalImpl.this);

                  transactions.put(transactionID, tnp);
               }

               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.replaceableUpdate); // +1 = compact
               // count
            }

            @Override
            public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
               hasData.lazySet(true);

               TransactionHolder tx = loadTransactions.get(transactionID);

               if (tx == null) {
                  tx = new TransactionHolder(transactionID);

                  loadTransactions.put(transactionID, tx);
               }

               tx.recordsToDelete.add(info);

               JournalTransaction tnp = transactions.get(transactionID);

               if (tnp == null) {
                  tnp = new JournalTransaction(transactionID, JournalImpl.this);

                  transactions.put(transactionID, tnp);
               }

               tnp.addNegative(file, info.id);

            }

            @Override
            public void onReadPrepareRecord(final long transactionID,
                                            final byte[] extraData,
                                            final int numberOfRecords) throws Exception {
               hasData.lazySet(true);

               TransactionHolder tx = loadTransactions.get(transactionID);

               if (tx == null) {
                  // The user could choose to prepare empty transactions
                  tx = new TransactionHolder(transactionID);

                  loadTransactions.put(transactionID, tx);
               }

               tx.prepared = true;

               tx.extraData = extraData;

               JournalTransaction journalTransaction = transactions.get(transactionID);

               if (journalTransaction == null) {
                  journalTransaction = new JournalTransaction(transactionID, JournalImpl.this);

                  transactions.put(transactionID, journalTransaction);
               }

               boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);

               if (healthy) {
                  journalTransaction.prepare(file);
               } else {
                  ActiveMQJournalLogger.LOGGER.preparedTXIncomplete(transactionID);
                  tx.invalid = true;
               }
            }

            @Override
            public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
               TransactionHolder tx = loadTransactions.remove(transactionID);

               // The commit could be alone on its own journal-file and the
               // whole transaction body was reclaimed but not the
               // commit-record
               // So it is completely legal to not find a transaction at this
               // point
               // If we can't find it, we assume the TX was reclaimed and we
               // ignore this
               if (tx != null) {
                  JournalTransaction journalTransaction = transactions.remove(transactionID);

                  if (journalTransaction == null) {
                     throw new IllegalStateException("Cannot find tx " + transactionID);
                  }

                  boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);

                  if (healthy) {
                     for (RecordInfo txRecord : tx.recordInfos) {
                        if (txRecord.isUpdate) {
                           loadManager.updateRecord(txRecord);
                        } else {
                           loadManager.addRecord(txRecord);
                        }
                     }

                     for (RecordInfo deleteValue : tx.recordsToDelete) {
                        loadManager.deleteRecord(deleteValue.id);
                     }

                     journalTransaction.commit(file);
                  } else {
                     ActiveMQJournalLogger.LOGGER.txMissingElements(transactionID);

                     journalTransaction.forget();
                  }

                  hasData.lazySet(true);
               }

            }

            @Override
            public void onReadRollbackRecord(final long transactionID) throws Exception {
               TransactionHolder tx = loadTransactions.remove(transactionID);

               // The rollback could be alone on its own journal-file and the
               // whole transaction body was reclaimed but the commit-record
               // So it is completely legal to not find a transaction at this
               // point
               if (tx != null) {
                  JournalTransaction tnp = transactions.remove(transactionID);

                  if (tnp == null) {
                     throw new IllegalStateException("Cannot find tx " + transactionID);
                  }

                  // There is no need to validate summaries/holes on
                  // Rollbacks.. We will ignore the data anyway.
                  tnp.rollback(file);

                  hasData.lazySet(true);
               }
            }

            @Override
            public void markAsDataFile(final JournalFile file) {
               hasData.lazySet(true);
            }

         }, wholeFileBufferRef, false, this.replaceableRecords);

         if (hasData.get()) {
            lastDataPos = resultLastPost;
            filesRepository.addDataFileOnBottom(file);
         } else {
            if (changeData) {
               // Empty dataFiles with no data
               filesRepository.addFreeFile(file, false, isRemoveExtraFilesOnLoad());
            }
         }
      }

      if (replicationSync == JournalState.SYNCING) {
         assert filesRepository.getDataFiles().isEmpty();
         setJournalState(JournalState.SYNCING);
         return new JournalLoadInformation(0, -1);
      }

      setUpCurrentFile(lastDataPos);

      setJournalState(JournalState.LOADED);

      for (TransactionHolder transaction : loadTransactions.values()) {
         if ((!transaction.prepared || transaction.invalid) && replicationSync != JournalState.SYNCING_UP_TO_DATE) {
            ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);

            if (changeData) {
               // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
               this.appendRollbackRecord(transaction.transactionID, false);
            }

            loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
         } else {
            for (RecordInfo info : transaction.recordInfos) {
               if (info.id > maxID.get()) {
                  maxID.lazySet(info.id);
               }
            }

            PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);

            info.getRecords().addAll(transaction.recordInfos);

            info.getRecordsToDelete().addAll(transaction.recordsToDelete);

            loadManager.addPreparedTransaction(info);
         }
      }

      if (changeData) {
         checkReclaimStatus();
      }

      return new JournalLoadInformation(records.size(), maxID.longValue());
   }