public void rebuild()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java [195:319]


   public void rebuild() throws Exception {
      if (pgStore == null) {
         logger.trace("Page store is null during rebuildCounters");
         return;
      }

      if (!paging) {
         logger.trace("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
      }

      logger.debug("Rebuilding page counter for address {}", pgStore.getAddress());

      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
         if (logger.isTraceEnabled()) {
            logger.trace("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
         }
         logger.debug("{} reading paging {} of {}", pgStore.getAddress(), pgid, limitPageId);
         Page page = pgStore.newPageObject(pgid);

         if (!page.getFile().exists()) {
            if (logger.isDebugEnabled()) {
               logger.trace("Skipping page {} on store {}", pgid, pgStore.getAddress());
            }
            continue;
         }
         page.open(false);
         LinkedList<PagedMessage> msgs = page.read(sm);
         page.close(false, false);

         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
            while (iter.hasNext()) {
               PagedMessage msg = iter.next();
               if (storedLargeMessages != null && msg.getMessage().isLargeMessage()) {
                  if (logger.isDebugEnabled()) {
                     logger.trace("removing storedLargeMessage {}", msg.getMessage().getMessageID());
                  }
                  storedLargeMessages.remove(msg.getMessage().getMessageID());
               }
               if (limitPageId == pgid) {
                  if (msg.getMessageNumber() >= limitMessageNr) {
                     if (logger.isDebugEnabled()) {
                        logger.trace("Rebuild counting on {} reached the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
                     }
                     // this is the limit where we should count..
                     // anything beyond this will be new data
                     break;
                  }
               }
               msg.initMessage(sm);
               long[] routedQueues = msg.getQueueIDs();

               if (logger.isTraceEnabled()) {
                  logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
               }

               PageTransactionInfo txInfo = null;

               if (msg.getTransactionID() > 0) {
                  txInfo = transactions.get(msg.getTransactionID());
                  if (txInfo != null) {
                     txInfo.setOrphaned(false);
                  }
               }

               Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();

               if (logger.isTraceEnabled()) {
                  logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX);
               }

               for (long queueID : routedQueues) {
                  boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());

                  // if the pageTransaction is in prepare state, we have to increment the counter after the commit
                  // notice that there is a check if the commit is done in afterCommit
                  if (preparedTX != null) {
                     PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID);
                     preparedTX.addOperation(new TransactionOperationAbstract() {
                        @Override
                        public void afterCommit(Transaction tx) {
                           // We use the pagingManager executor here, in case the commit happened while the rebuild manager is working
                           // in that case the increment will wait any pending tasks on that executor to finish before this executor takes effect
                           pagingManager.execute(() -> {
                              try {
                                 subscription.getCounter().increment(null, 1, msg.getStoredSize());
                              } catch (Exception e) {
                                 logger.warn(e.getMessage(), e);
                              }
                           });
                        }
                     });

                  } else {
                     boolean txIncluded = msg.getTransactionID() <= 0 || transactions == null || txInfo != null;

                     if (!txIncluded) {
                        logger.trace("TX is not included for {}", msg);
                     }

                     if (ok && txIncluded) { // not acked and TX is ok
                        if (logger.isTraceEnabled()) {
                           logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
                        }
                        CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
                        if (copiedSubscription != null) {
                           copiedSubscription.empty = false;
                           copiedSubscription.addUp++;
                           copiedSubscription.sizeUp += msg.getPersistentSize();
                        }
                     } else {
                        if (logger.isTraceEnabled()) {
                           logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
                        }
                     }
                  }
               }
            }
         }
      }

      logger.debug("Counter rebuilding done for address {}", pgStore.getAddress());

      done();

   }