private void loadSinglePreparedTransaction()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java [1841:2025]


   private void loadSinglePreparedTransaction(PostOffice postOffice,
                          PagingManager pagingManager,
                          ResourceManager resourceManager,
                          Map<Long, QueueBindingInfo> queueInfos,
                          Map<Long, PageSubscription> pageSubscriptions,
                          Set<Pair<Long, Long>> pendingLargeMessages,
                          final Set<Long> storedLargeMessages,
                          JournalLoader journalLoader,
                          CoreMessageObjectPools pools,
                          PreparedTransactionInfo preparedTransaction) throws Exception {
      XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());

      Xid xid = encodingXid.xid;

      Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this);

      List<MessageReference> referencesToAck = new ArrayList<>();

      Map<Long, Message> messages = new HashMap<>();

      // Use same method as load message journal to prune out acks, so they don't get added.
      // Then have reacknowledge(tx) methods on queue, which needs to add the page size

      // first get any sent messages for this tx and recreate
      for (RecordInfo record : preparedTransaction.getRecords()) {
         byte[] data = record.data;

         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);

         byte recordType = record.getUserRecordType();

         switch (recordType) {
            case JournalRecordIds.ADD_LARGE_MESSAGE: {
               if (storedLargeMessages != null && storedLargeMessages.remove(record.id)) {
                  if (logger.isDebugEnabled()) {
                     logger.debug("PreparedTX/AddLargeMessage load removing stored large message {}", record.id);
                  }
               }
               messages.put(record.id, parseLargeMessage(buff).toMessage());

               break;
            }
            case JournalRecordIds.ADD_MESSAGE: {

               break;
            }
            case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
               Message message = decodeMessage(pools, buff);
               if (storedLargeMessages != null && message.isLargeMessage() && storedLargeMessages.remove(record.id)) {
                  logger.debug("PreparedTX/AddMessgeProtocol load removing stored large message {}", record.id);
               }

               messages.put(record.id, message);

               break;
            }
            case JournalRecordIds.ADD_REF: {
               long messageID = record.id;

               RefEncoding encoding = new RefEncoding();

               encoding.decode(buff);

               Message message = messages.get(messageID);

               if (message == null) {
                  throw new IllegalStateException("Cannot find message with id " + messageID);
               }

               journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID);

               break;
            }
            case JournalRecordIds.ACKNOWLEDGE_REF: {
               long messageID = record.id;

               RefEncoding encoding = new RefEncoding();

               encoding.decode(buff);

               journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID);

               break;
            }
            case JournalRecordIds.PAGE_TRANSACTION: {

               PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();

               pageTransactionInfo.decode(buff);

               if (record.isUpdate) {
                  PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
                  if (pgTX != null) {
                     pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
                  }
               } else {
                  pageTransactionInfo.reloadPrepared(tx);

                  tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);

                  pagingManager.addTransaction(pageTransactionInfo);

                  tx.addOperation(new FinishPageMessageOperation());
               }

               break;
            }
            case SET_SCHEDULED_DELIVERY_TIME: {
               // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which
               // case the message will already have the header for the scheduled delivery time, so no need to do
               // anything.

               break;
            }
            case DUPLICATE_ID: {
               // We need load the duplicate ids at prepare time too
               DuplicateIDEncoding encoding = new DuplicateIDEncoding();

               encoding.decode(buff);

               DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address);

               cache.load(tx, encoding.duplID);

               break;
            }
            case ACKNOWLEDGE_CURSOR: {
               CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
               encoding.decode(buff);

               encoding.position.setRecordID(record.id);

               PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);

               if (sub != null) {
                  sub.reloadPreparedACK(tx, encoding.position);
                  referencesToAck.add(new QueryPagedReferenceImpl(encoding.position, null, sub));
               } else {
                  ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
               }
               break;
            }
            case PAGE_CURSOR_COUNTER_INC: {
               PageCountRecordInc encoding = new PageCountRecordInc();

               encoding.decode(buff);

               logger.debug("Page cursor counter inc on a prepared TX.");

               // TODO: do I need to remove the record on commit?

               break;
            }

            default: {
               ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType);
            }
         }
      }

      for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) {
         byte[] data = recordDeleted.data;

         if (data.length > 0) {
            ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
            byte b = buff.readByte();

            switch (b) {
               case ADD_LARGE_MESSAGE_PENDING: {
                  long messageID = buff.readLong();
                  if (!pendingLargeMessages.remove(new Pair<>(recordDeleted.id, messageID))) {
                     ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
                  }
                  installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
                  break;
               }
               default:
                  ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
            }
         }

      }

      journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager);
   }