in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java [247:290]
public void acknowledge(ACK_TYPE ackType) {
LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ackType);
Delivery delivery = getEndpoint().head();
while (delivery != null) {
Delivery current = delivery;
delivery = delivery.next();
if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
LOG.debug("{} Found incomplete delivery with no context during session acknowledge processing", AmqpConsumer.this);
continue;
}
JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
if (ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
handleDisposition(envelope, current, MODIFIED_FAILED);
} else if (envelope.isDelivered()) {
final DeliveryState disposition;
switch (ackType) {
case ACCEPTED:
disposition = Accepted.getInstance();
break;
case RELEASED:
disposition = Released.getInstance();
break;
case REJECTED:
disposition = REJECTED;
break;
case MODIFIED_FAILED:
disposition = MODIFIED_FAILED;
break;
case MODIFIED_FAILED_UNDELIVERABLE:
disposition = MODIFIED_FAILED_UNDELIVERABLE;
break;
default:
throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType);
}
handleDisposition(envelope, current, disposition);
}
}
tryCompleteDeferredClose();
}