public boolean handle()

in broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java [285:429]


            public boolean handle(final Transaction.StoredXidRecord storedXid,
                                  final Transaction.EnqueueRecord[] enqueues,
                                  final Transaction.DequeueRecord[] dequeues)
            {
                Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
                DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry();
                DtxBranch branch = dtxRegistry.getBranch(id);
                if (branch == null)
                {
                    branch = new DtxBranch(storedXid, dtxRegistry);
                    dtxRegistry.registerBranch(branch);
                }
                for (Transaction.EnqueueRecord record : enqueues)
                {
                    final Queue<?> queue = getVirtualHost().getAttainedQueue(record.getResource().getId());
                    if (queue != null)
                    {
                        final long messageId = record.getMessage().getMessageNumber();
                        final ServerMessage<?> message = getRecoveredMessage(messageId);

                        if (message != null)
                        {
                            final MessageReference<?> ref = message.newReference();

                            final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];

                            branch.enqueue(queue, message, record1 -> records[0] = record1);
                            branch.addPostTransactionAction(new ServerTransaction.Action()
                            {
                                @Override
                                public void postCommit()
                                {
                                    queue.enqueue(message, null, records[0]);
                                    ref.release();
                                }

                                @Override
                                public void onRollback()
                                {
                                    ref.release();
                                }
                            });

                        }
                        else
                        {
                            StringBuilder xidString = xidAsString(id);
                            getEventLogger().message(getLogSubject(),
                                                            TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
                                                                                                         Long.toString(
                                                                                                                 messageId)));
                        }
                    }
                    else
                    {
                        StringBuilder xidString = xidAsString(id);
                        getEventLogger().message(getLogSubject(),
                                                        TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
                                                                                                   record.getResource()
                                                                                                           .getId()
                                                                                                           .toString()));

                    }
                }
                for (Transaction.DequeueRecord record : dequeues)
                {

                    final Queue<?> queue = getVirtualHost().getAttainedQueue(record.getEnqueueRecord().getQueueId());

                    if (queue != null)
                    {
                        // For DTX to work correctly the queues which have uncommitted branches with dequeues
                        // must be synchronously recovered

                        if (isRecovering(queue))
                        {
                            recoverQueue(queue);
                        }

                        final long messageId = record.getEnqueueRecord().getMessageNumber();
                        final ServerMessage<?> message = getRecoveredMessage(messageId);

                        if (message != null)
                        {
                            final QueueEntry entry = queue.getMessageOnTheQueue(messageId);

                            if (entry.acquire())
                            {
                                branch.dequeue(entry.getEnqueueRecord());

                                branch.addPostTransactionAction(new ServerTransaction.Action()
                                {

                                    @Override
                                    public void postCommit()
                                    {
                                        entry.delete();
                                    }

                                    @Override
                                    public void onRollback()
                                    {
                                        entry.release();
                                    }
                                });
                            }
                            else
                            {
                                // Should never happen - dtx recovery is always synchronous and occurs before
                                // any other message actors are allowed to act on the virtualhost.
                                throw new ServerScopedRuntimeException(
                                        "Distributed transaction dequeue handler failed to acquire " + entry +
                                        " during recovery of queue " + queue);

                            }
                        }
                        else
                        {
                            StringBuilder xidString = xidAsString(id);
                            getEventLogger().message(getLogSubject(),
                                                            TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
                                                                                                         Long.toString(
                                                                                                                 messageId)));

                        }

                    }
                    else
                    {
                        StringBuilder xidString = xidAsString(id);
                        getEventLogger().message(getLogSubject(),
                                                        TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
                                                                                                   record.getEnqueueRecord()
                                                                                                           .getQueueId()
                                                                                                           .toString()));
                    }

                }


                branch.setState(DtxBranch.State.PREPARED);
                branch.prePrepareTransaction();

                return _continueRecovery.get();
            }