in artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java [195:319]
public void rebuild() throws Exception {
if (pgStore == null) {
logger.trace("Page store is null during rebuildCounters");
return;
}
if (!paging) {
logger.trace("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
}
logger.debug("Rebuilding page counter for address {}", pgStore.getAddress());
for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
if (logger.isTraceEnabled()) {
logger.trace("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
}
logger.debug("{} reading paging {} of {}", pgStore.getAddress(), pgid, limitPageId);
Page page = pgStore.newPageObject(pgid);
if (!page.getFile().exists()) {
if (logger.isDebugEnabled()) {
logger.trace("Skipping page {} on store {}", pgid, pgStore.getAddress());
}
continue;
}
page.open(false);
LinkedList<PagedMessage> msgs = page.read(sm);
page.close(false, false);
try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
while (iter.hasNext()) {
PagedMessage msg = iter.next();
if (storedLargeMessages != null && msg.getMessage().isLargeMessage()) {
if (logger.isDebugEnabled()) {
logger.trace("removing storedLargeMessage {}", msg.getMessage().getMessageID());
}
storedLargeMessages.remove(msg.getMessage().getMessageID());
}
if (limitPageId == pgid) {
if (msg.getMessageNumber() >= limitMessageNr) {
if (logger.isDebugEnabled()) {
logger.trace("Rebuild counting on {} reached the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
}
// this is the limit where we should count..
// anything beyond this will be new data
break;
}
}
msg.initMessage(sm);
long[] routedQueues = msg.getQueueIDs();
if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
}
PageTransactionInfo txInfo = null;
if (msg.getTransactionID() > 0) {
txInfo = transactions.get(msg.getTransactionID());
if (txInfo != null) {
txInfo.setOrphaned(false);
}
}
Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();
if (logger.isTraceEnabled()) {
logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX);
}
for (long queueID : routedQueues) {
boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
// if the pageTransaction is in prepare state, we have to increment the counter after the commit
// notice that there is a check if the commit is done in afterCommit
if (preparedTX != null) {
PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID);
preparedTX.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
// We use the pagingManager executor here, in case the commit happened while the rebuild manager is working
// in that case the increment will wait any pending tasks on that executor to finish before this executor takes effect
pagingManager.execute(() -> {
try {
subscription.getCounter().increment(null, 1, msg.getStoredSize());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
});
} else {
boolean txIncluded = msg.getTransactionID() <= 0 || transactions == null || txInfo != null;
if (!txIncluded) {
logger.trace("TX is not included for {}", msg);
}
if (ok && txIncluded) { // not acked and TX is ok
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
copiedSubscription.empty = false;
copiedSubscription.addUp++;
copiedSubscription.sizeUp += msg.getPersistentSize();
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
}
}
}
}
}
}
logger.debug("Counter rebuilding done for address {}", pgStore.getAddress());
done();
}