in artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java [338:518]
public static DescribeJournal printSurvivingRecords(Journal journal,
PrintStream out,
boolean safe) throws Exception {
final Map<Long, PageSubscriptionCounterImpl> counters = new HashMap<>();
out.println("### Surviving Records Summary ###");
List<RecordInfo> records = new LinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
List<PersistentQueueBindingEncoding> bindings = null;
journal.start();
final StringBuilder bufferFailingTransactions = new StringBuilder();
final class Count {
int value;
Count(int v) {
value = v;
}
@Override
public String toString() {
return Integer.toString(value);
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Count count = (Count) o;
return value == count.value;
}
@Override
public int hashCode() {
return Integer.hashCode(value);
}
}
long messageCount = 0;
long largeMessageCount = 0;
Map<Long, Count> messageRefCounts = new HashMap<>();
long preparedMessageCount = 0;
long preparedLargeMessageCount = 0;
Map<Long, Count> preparedMessageRefCount = new HashMap<>();
journal.load(records, preparedTransactions, (transactionID, records1, recordsToDelete) -> {
bufferFailingTransactions.append("Transaction " + transactionID + " failed with these records:\n");
for (RecordInfo info : records1) {
bufferFailingTransactions.append("- " + describeRecord(info, safe) + "\n");
}
for (RecordInfo info : recordsToDelete) {
bufferFailingTransactions.append("- " + describeRecord(info, safe) + " <marked to delete>\n");
}
}, false);
for (RecordInfo info : records) {
PageSubscriptionCounterImpl subsCounter = null;
long queueIDForCounter = 0;
Object o = newObjectEncoding(info);
final byte userRecordType = info.getUserRecordType();
if (userRecordType == ADD_MESSAGE || userRecordType == ADD_MESSAGE_PROTOCOL) {
messageCount++;
} else if (userRecordType == ADD_LARGE_MESSAGE) {
largeMessageCount++;
} else if (userRecordType == JournalRecordIds.ADD_REF) {
ReferenceDescribe ref = (ReferenceDescribe) o;
Count count = messageRefCounts.get(ref.refEncoding.queueID);
if (count == null) {
count = new Count(0);
messageRefCounts.put(ref.refEncoding.queueID, count);
}
count.value++;
} else if (userRecordType == JournalRecordIds.ACKNOWLEDGE_REF) {
AckDescribe ref = (AckDescribe) o;
Count count = messageRefCounts.get(ref.refEncoding.queueID);
if (count == null) {
messageRefCounts.put(ref.refEncoding.queueID, new Count(0));
} else {
count.value--;
}
} else if (userRecordType == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) o;
queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
} else if (userRecordType == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) o;
queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
} else if (userRecordType == QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
if (bindings == null) {
bindings = new LinkedList<>();
}
bindings.add(bindingEncoding);
}
out.println(describeRecord(info, o, safe));
if (subsCounter != null) {
out.println("##SubsCounter for queue=" + queueIDForCounter + ", value=" + subsCounter.getValue());
out.println();
}
}
if (!counters.isEmpty()) {
out.println("### Page Counters");
printCounters(out, counters);
}
out.println();
out.println("### Prepared TX ###");
for (PreparedTransactionInfo tx : preparedTransactions) {
out.println(tx.getId());
for (RecordInfo info : tx.getRecords()) {
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o, safe));
final byte userRecordType = info.getUserRecordType();
if (userRecordType == ADD_MESSAGE || userRecordType == ADD_MESSAGE_PROTOCOL) {
preparedMessageCount++;
} else if (userRecordType == ADD_LARGE_MESSAGE) {
preparedLargeMessageCount++;
} else if (userRecordType == ADD_REF) {
ReferenceDescribe ref = (ReferenceDescribe) o;
Count count = preparedMessageRefCount.get(ref.refEncoding.queueID);
if (count == null) {
count = new Count(0);
preparedMessageRefCount.put(ref.refEncoding.queueID, count);
}
count.value++;
}
}
for (RecordInfo info : tx.getRecordsToDelete()) {
out.println("- " + describeRecord(info, safe) + " <marked to delete>");
}
}
String missingTX = bufferFailingTransactions.toString();
if (!missingTX.isEmpty()) {
out.println();
out.println("### Failed Transactions (Missing commit/prepare/rollback record) ###");
}
out.println(bufferFailingTransactions.toString());
out.println("### Message Counts ###");
out.println("message count=" + messageCount);
out.println("large message count=" + largeMessageCount);
out.println("message reference count");
messageRefCounts.forEach((queueId, count) -> {
out.println("queue id " + queueId + ",count=" + count);
});
out.println("prepared message count=" + preparedMessageCount);
out.println("prepared large message count=" + preparedLargeMessageCount);
out.println("prepared message reference count");
preparedMessageRefCount.forEach((queueId, count) -> {
out.println("queue id " + queueId + ",count=" + count);
});
journal.stop();
return new DescribeJournal(records, preparedTransactions).setBindingEncodings(bindings);
}