in artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java [1841:2025]
private void loadSinglePreparedTransaction(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<Long, PageSubscription> pageSubscriptions,
Set<Pair<Long, Long>> pendingLargeMessages,
final Set<Long> storedLargeMessages,
JournalLoader journalLoader,
CoreMessageObjectPools pools,
PreparedTransactionInfo preparedTransaction) throws Exception {
XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
Xid xid = encodingXid.xid;
Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this);
List<MessageReference> referencesToAck = new ArrayList<>();
Map<Long, Message> messages = new HashMap<>();
// Use same method as load message journal to prune out acks, so they don't get added.
// Then have reacknowledge(tx) methods on queue, which needs to add the page size
// first get any sent messages for this tx and recreate
for (RecordInfo record : preparedTransaction.getRecords()) {
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
switch (recordType) {
case JournalRecordIds.ADD_LARGE_MESSAGE: {
if (storedLargeMessages != null && storedLargeMessages.remove(record.id)) {
if (logger.isDebugEnabled()) {
logger.debug("PreparedTX/AddLargeMessage load removing stored large message {}", record.id);
}
}
messages.put(record.id, parseLargeMessage(buff).toMessage());
break;
}
case JournalRecordIds.ADD_MESSAGE: {
break;
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = decodeMessage(pools, buff);
if (storedLargeMessages != null && message.isLargeMessage() && storedLargeMessages.remove(record.id)) {
logger.debug("PreparedTX/AddMessgeProtocol load removing stored large message {}", record.id);
}
messages.put(record.id, message);
break;
}
case JournalRecordIds.ADD_REF: {
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
encoding.decode(buff);
Message message = messages.get(messageID);
if (message == null) {
throw new IllegalStateException("Cannot find message with id " + messageID);
}
journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID);
break;
}
case JournalRecordIds.ACKNOWLEDGE_REF: {
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
encoding.decode(buff);
journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID);
break;
}
case JournalRecordIds.PAGE_TRANSACTION: {
PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
if (record.isUpdate) {
PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
if (pgTX != null) {
pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
}
} else {
pageTransactionInfo.reloadPrepared(tx);
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
pagingManager.addTransaction(pageTransactionInfo);
tx.addOperation(new FinishPageMessageOperation());
}
break;
}
case SET_SCHEDULED_DELIVERY_TIME: {
// Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which
// case the message will already have the header for the scheduled delivery time, so no need to do
// anything.
break;
}
case DUPLICATE_ID: {
// We need load the duplicate ids at prepare time too
DuplicateIDEncoding encoding = new DuplicateIDEncoding();
encoding.decode(buff);
DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address);
cache.load(tx, encoding.duplID);
break;
}
case ACKNOWLEDGE_CURSOR: {
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.reloadPreparedACK(tx, encoding.position);
referencesToAck.add(new QueryPagedReferenceImpl(encoding.position, null, sub));
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
}
break;
}
case PAGE_CURSOR_COUNTER_INC: {
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
logger.debug("Page cursor counter inc on a prepared TX.");
// TODO: do I need to remove the record on commit?
break;
}
default: {
ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType);
}
}
}
for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) {
byte[] data = recordDeleted.data;
if (data.length > 0) {
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
byte b = buff.readByte();
switch (b) {
case ADD_LARGE_MESSAGE_PENDING: {
long messageID = buff.readLong();
if (!pendingLargeMessages.remove(new Pair<>(recordDeleted.id, messageID))) {
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
}
installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
break;
}
default:
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
}
}
}
journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager);
}