in broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java [449:620]
public boolean process(DeliveryState state, final Boolean settled)
{
Binary transactionId = null;
final Outcome outcome;
ServerTransaction txn;
// If disposition is settled this overrides the txn?
if(state instanceof TransactionalState)
{
transactionId = ((TransactionalState)state).getTxnId();
outcome = ((TransactionalState)state).getOutcome();
try
{
txn = _linkEndpoint.getTransaction(transactionId);
getSession().getConnection().registerTransactedMessageDelivered();
final TransactionLogResource owningResource = _queueEntry.getOwningResource();
if (owningResource instanceof TransactionMonitor)
{
((TransactionMonitor) owningResource).registerTransaction(txn);
}
}
catch (UnknownTransactionException e)
{
getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
applyModifiedOutcome();
return false;
}
}
else if (state instanceof Outcome)
{
outcome = (Outcome) state;
txn = _linkEndpoint.getAsyncAutoCommitTransaction();
}
else
{
outcome = null;
txn = null;
}
if (txn != null)
{
if (outcome instanceof Accepted)
{
if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
{
txn.dequeue(_queueEntry.getEnqueueRecord(),
new ServerTransaction.Action()
{
@Override
public void postCommit()
{
if (_queueEntry.isAcquiredBy(getConsumer()))
{
_queueEntry.delete();
}
}
@Override
public void onRollback()
{
}
});
}
txn.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
if (Boolean.TRUE.equals(settled))
{
_linkEndpoint.settle(_deliveryTag);
}
else
{
_linkEndpoint.updateDisposition(_deliveryTag, outcome, true);
}
}
@Override
public void onRollback()
{
if (Boolean.TRUE.equals(settled))
{
// TODO: apply source's default outcome
applyModifiedOutcome();
}
}
});
}
else if (outcome instanceof Released)
{
txn.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
_queueEntry.release(getConsumer());
_linkEndpoint.settle(_deliveryTag);
}
@Override
public void onRollback()
{
_linkEndpoint.settle(_deliveryTag);
// TODO: apply source's default outcome if settled
}
});
}
else if (outcome instanceof Modified)
{
txn.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
final Modified modifiedOutcome = (Modified) outcome;
if (Boolean.TRUE.equals(modifiedOutcome.getUndeliverableHere()))
{
_queueEntry.reject(getConsumer());
}
if (Boolean.TRUE.equals(modifiedOutcome.getDeliveryFailed()))
{
incrementDeliveryCountOrRouteToAlternateOrDiscard();
}
else
{
_queueEntry.release(getConsumer());
}
_linkEndpoint.settle(_deliveryTag);
}
@Override
public void onRollback()
{
if (Boolean.TRUE.equals(settled))
{
// TODO: apply source's default outcome
applyModifiedOutcome();
}
}
});
}
else if (outcome instanceof Rejected)
{
txn.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
_linkEndpoint.settle(_deliveryTag);
incrementDeliveryCountOrRouteToAlternateOrDiscard();
_linkEndpoint.sendFlowConditional();
}
@Override
public void onRollback()
{
if (Boolean.TRUE.equals(settled))
{
// TODO: apply source's default outcome
applyModifiedOutcome();
}
}
});
}
}
return (transactionId == null && outcome != null);
}