public JournalLoadInformation loadMessageJournal()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java [1030:1437]


   public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
                                                    final PagingManager pagingManager,
                                                    final ResourceManager resourceManager,
                                                    Map<Long, QueueBindingInfo> queueInfos,
                                                    final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
                                                    final Set<Pair<Long, Long>> pendingLargeMessages,
                                                    final Set<Long> storedLargeMessages,
                                                    List<PageCountPending> pendingNonTXPageCounter,
                                                    final JournalLoader journalLoader,
                                                    final List<Consumer<RecordInfo>> journalRecordsListener) throws Exception {
      SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();

      List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();

      Set<PageTransactionInfo> invalidPageTransactions = new HashSet<>();

      Map<Long, Message> messages = new HashMap<>();
      try (ArtemisCloseable lock = closeableReadLock()) {
         messageJournal.setRemoveExtraFilesOnLoad(true);
         JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));

         List<LargeServerMessage> largeMessages = new ArrayList<>();

         Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<>();

         Map<Long, PageSubscription> pageSubscriptions = new HashMap<>();

         final long totalSize = records.size();

         final class MutableLong {

            long value;
         }

         final MutableLong recordNumber = new MutableLong();
         final CoreMessageObjectPools pools;
         if (totalSize > 0) {
            final int addresses = (int) Math.max(DEFAULT_POOL_CAPACITY, queueInfos == null ? 0 : queueInfos.values().stream().map(qInfo -> qInfo.getQueueConfiguration().getAddress()).filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH).count() * 2);
            pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
         } else {
            pools = null;
         }
         // This will free up memory sooner while reading the records
         records.clear(record -> {
            try {
               // It will show log.info only with large journals (more than 1 million records)
               if (recordNumber.value > 0 && recordNumber.value % 1000000 == 0) {
                  long percent = (long) ((((double) recordNumber.value) / ((double) totalSize)) * 100f);

                  ActiveMQServerLogger.LOGGER.percentLoaded(percent);
               }
               recordNumber.value++;

               byte[] data = record.data;

               // We can make this byte[] buffer releasable, because subsequent methods using it are not supposed
               // to release it. It saves creating useless UnreleasableByteBuf wrappers
               ChannelBufferWrapper buff = new ChannelBufferWrapper(Unpooled.wrappedBuffer(data), true);

               byte recordType = record.getUserRecordType();

               switch (recordType) {
                  case JournalRecordIds.ADD_LARGE_MESSAGE_PENDING: {
                     PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();

                     pending.decode(buff);

                     if (pendingLargeMessages != null) {
                        // it could be null on tests, and we don't need anything on that case
                        pendingLargeMessages.add(new Pair<>(record.id, pending.largeMessageID));
                     }
                     break;
                  }
                  case JournalRecordIds.ADD_LARGE_MESSAGE: {
                     LargeServerMessage largeMessage = parseLargeMessage(buff);

                     messages.put(record.id, largeMessage.toMessage());

                     if (storedLargeMessages != null) {
                        storedLargeMessages.remove(largeMessage.getMessageID());
                     }

                     largeMessages.add(largeMessage);

                     break;
                  }
                  case JournalRecordIds.ADD_MESSAGE: {
                     throw new IllegalStateException("This is using old journal data, export your data and import at the correct version");
                  }

                  case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {

                     Message message = decodeMessage(pools, buff);

                     if (message.isLargeMessage() && storedLargeMessages != null) {
                        storedLargeMessages.remove(message.getMessageID());
                     }

                     if (message.isLargeMessage()) {
                        largeMessages.add((LargeServerMessage) message);
                     }

                     messages.put(record.id, message);

                     break;
                  }
                  case JournalRecordIds.ADD_REF: {
                     long messageID = record.id;

                     RefEncoding encoding = new RefEncoding();

                     encoding.decode(buff);

                     Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);

                     if (queueMessages == null) {
                        queueMessages = new LinkedHashMap<>();

                        queueMap.put(encoding.queueID, queueMessages);
                     }

                     Message message = messages.get(messageID);

                     if (message == null) {
                        ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id);
                     } else {
                        queueMessages.put(messageID, new AddMessageRecord(message));
                     }

                     break;
                  }
                  case JournalRecordIds.ACKNOWLEDGE_REF: {
                     long messageID = record.id;

                     RefEncoding encoding = new RefEncoding();

                     encoding.decode(buff);

                     Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);

                     if (queueMessages == null) {
                        ActiveMQServerLogger.LOGGER.journalCannotFindQueue(encoding.queueID, messageID);
                     } else {
                        AddMessageRecord rec = queueMessages.remove(messageID);

                        if (rec == null) {
                           ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID);
                        }
                     }

                     break;
                  }
                  case JournalRecordIds.UPDATE_DELIVERY_COUNT: {
                     long messageID = record.id;

                     DeliveryCountUpdateEncoding encoding = new DeliveryCountUpdateEncoding();

                     encoding.decode(buff);

                     Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);

                     if (queueMessages == null) {
                        ActiveMQServerLogger.LOGGER.journalCannotFindQueueDelCount(encoding.queueID);
                     } else {
                        AddMessageRecord rec = queueMessages.get(messageID);

                        if (rec == null) {
                           ActiveMQServerLogger.LOGGER.journalCannotFindMessageDelCount(messageID);
                        } else {
                           rec.setDeliveryCount(encoding.count);
                        }
                     }

                     break;
                  }
                  case JournalRecordIds.PAGE_TRANSACTION: {
                     PageTransactionInfo invalidPGTx = null;
                     if (record.isUpdate) {
                        PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();

                        pageUpdate.decode(buff);

                        PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);

                        if (pageTX == null) {
                           ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
                        } else {
                           if (!pageTX.onUpdate(pageUpdate.records, null, null)) {
                              invalidPGTx = pageTX;
                           }
                        }
                     } else {
                        PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();

                        pageTransactionInfo.decode(buff);

                        pageTransactionInfo.setRecordID(record.id);

                        pagingManager.addTransaction(pageTransactionInfo);

                        if (!pageTransactionInfo.checkSize(null, null)) {
                           invalidPGTx = pageTransactionInfo;
                        }
                     }

                     if (invalidPGTx != null) {
                        invalidPageTransactions.add(invalidPGTx);
                     }

                     break;
                  }
                  case JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME: {
                     long messageID = record.id;

                     ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();

                     encoding.decode(buff);

                     Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);

                     if (queueMessages == null) {
                        ActiveMQServerLogger.LOGGER.journalCannotFindQueueScheduled(encoding.queueID, messageID);
                     } else {

                        AddMessageRecord rec = queueMessages.get(messageID);

                        if (rec == null) {
                           ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID);
                        } else {
                           rec.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
                        }
                     }

                     break;
                  }
                  case JournalRecordIds.DUPLICATE_ID: {
                     DuplicateIDEncoding encoding = new DuplicateIDEncoding();

                     encoding.decode(buff);

                     List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);

                     if (ids == null) {
                        ids = new ArrayList<>();

                        duplicateIDMap.put(encoding.address, ids);
                     }

                     ids.add(new Pair<>(encoding.duplID, record.id));

                     break;
                  }
                  case JournalRecordIds.HEURISTIC_COMPLETION: {
                     HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding();
                     encoding.decode(buff);
                     resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
                     break;
                  }
                  case JournalRecordIds.ACKNOWLEDGE_CURSOR: {
                     CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
                     encoding.decode(buff);

                     encoding.position.setRecordID(record.id);

                     PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);

                     if (sub != null) {
                        sub.reloadACK(encoding.position);
                     } else {
                        ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID);
                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);

                     }

                     break;
                  }
                  case JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE: {
                     PageCountRecord encoding = new PageCountRecord();

                     encoding.decode(buff);

                     PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);

                     if (sub != null) {
                        sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
                        if (encoding.getValue() > 0) {
                           sub.notEmpty();
                        }
                     } else {
                        ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                     }

                     break;
                  }

                  case JournalRecordIds.PAGE_CURSOR_COUNTER_INC: {
                     PageCountRecordInc encoding = new PageCountRecordInc();

                     encoding.decode(buff);

                     PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);

                     if (sub != null) {
                        sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
                     } else {
                        ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                     }

                     break;
                  }

                  case JournalRecordIds.PAGE_CURSOR_COMPLETE: {
                     CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
                     encoding.decode(buff);

                     encoding.position.setRecordID(record.id);

                     PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);

                     if (sub != null) {
                        if (!sub.reloadPageCompletion(encoding.position)) {
                           if (logger.isDebugEnabled()) {
                              logger.debug("Complete page {} doesn't exist on page manager {}", encoding.position.getPageNr(), sub.getPagingStore().getAddress());
                           }
                           messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                        }
                     } else {
                        ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID);
                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                     }

                     break;
                  }

                  case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: {

                     PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl();

                     pendingCountEncoding.decode(buff);
                     pendingCountEncoding.setID(record.id);
                     PageSubscription sub = locateSubscription(pendingCountEncoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
                     if (sub != null) {
                        sub.notEmpty();
                     }
                     // This can be null on testcases not interested on this outcome
                     if (pendingNonTXPageCounter != null) {
                        pendingNonTXPageCounter.add(pendingCountEncoding);
                     }

                     break;
                  }

                  default: {
                     logger.debug("Extra record type {}", record.userRecordType);
                     if (journalRecordsListener != null) {
                        journalRecordsListener.forEach(f -> f.accept(record));
                     }
                  }
               }
            } catch (RuntimeException e) {
               throw e;
            } catch (Exception e) {
               throw new RuntimeException(e);
            }
         });

         // Release the memory as soon as not needed any longer
         records = null;

         journalLoader.handleAddMessage(queueMap);

         loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, this::failedToPrepareException, pageSubscriptions, pendingLargeMessages, storedLargeMessages, journalLoader);

         for (PageSubscription sub : pageSubscriptions.values()) {
            sub.getCounter().processReload();
         }

         for (LargeServerMessage msg : largeMessages) {
            if (storedLargeMessages != null && storedLargeMessages.remove(msg.getMessageID())) {
               if (logger.isDebugEnabled()) {
                  logger.debug("Large message in folder removed on {}", msg.getMessageID());
               }
            }
            if (msg.toMessage().getRefCount() == 0 && msg.toMessage().getDurableCount() == 0) {
               ActiveMQServerLogger.LOGGER.largeMessageWithNoRef(msg.getMessageID());
               msg.toMessage().usageDown();
            }
         }

         journalLoader.handleNoMessageReferences(messages);

         // To recover positions on Iterators
         if (pagingManager != null) {
            // it could be null on certain tests that are not dealing with paging
            // This could also be the case in certain embedded conditions
            pagingManager.processReload();
         }

         journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap);

         checkInvalidPageTransactions(pagingManager, invalidPageTransactions);

         journalLoaded = true;
         return info;
      }
   }