in broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java [285:429]
public boolean handle(final Transaction.StoredXidRecord storedXid,
final Transaction.EnqueueRecord[] enqueues,
final Transaction.DequeueRecord[] dequeues)
{
Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry();
DtxBranch branch = dtxRegistry.getBranch(id);
if (branch == null)
{
branch = new DtxBranch(storedXid, dtxRegistry);
dtxRegistry.registerBranch(branch);
}
for (Transaction.EnqueueRecord record : enqueues)
{
final Queue<?> queue = getVirtualHost().getAttainedQueue(record.getResource().getId());
if (queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
final ServerMessage<?> message = getRecoveredMessage(messageId);
if (message != null)
{
final MessageReference<?> ref = message.newReference();
final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
branch.enqueue(queue, message, record1 -> records[0] = record1);
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
queue.enqueue(message, null, records[0]);
ref.release();
}
@Override
public void onRollback()
{
ref.release();
}
});
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(
messageId)));
}
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
record.getResource()
.getId()
.toString()));
}
}
for (Transaction.DequeueRecord record : dequeues)
{
final Queue<?> queue = getVirtualHost().getAttainedQueue(record.getEnqueueRecord().getQueueId());
if (queue != null)
{
// For DTX to work correctly the queues which have uncommitted branches with dequeues
// must be synchronously recovered
if (isRecovering(queue))
{
recoverQueue(queue);
}
final long messageId = record.getEnqueueRecord().getMessageNumber();
final ServerMessage<?> message = getRecoveredMessage(messageId);
if (message != null)
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
if (entry.acquire())
{
branch.dequeue(entry.getEnqueueRecord());
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
entry.release();
}
});
}
else
{
// Should never happen - dtx recovery is always synchronous and occurs before
// any other message actors are allowed to act on the virtualhost.
throw new ServerScopedRuntimeException(
"Distributed transaction dequeue handler failed to acquire " + entry +
" during recovery of queue " + queue);
}
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(
messageId)));
}
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
record.getEnqueueRecord()
.getQueueId()
.toString()));
}
}
branch.setState(DtxBranch.State.PREPARED);
branch.prePrepareTransaction();
return _continueRecovery.get();
}