in activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java [193:396]
public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
Destination destination = null;
if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
// suppress unexpected ack exception in this expected case
LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
return;
}
LOG.trace("ack: {}", ack);
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
// First check if the ack matches the dispatched. When using failover this might
// not be the case. We don't ever want to ack the wrong messages.
assertAckMatchesDispatched(ack);
// Acknowledge all dispatched messages up till the message id of
// the acknowledgment.
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
getSubscriptionStatistics().getDequeues().increment();
removeList.add(node);
contractPrefetchExtension(1);
} else {
registerRemoveSync(context, node);
}
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
}
for (final MessageReference node : removeList) {
dispatched.remove(node);
decrementPrefetchCounter(node);
}
// this only happens after a reconnect - get an ack which is not
// valid
if (!callDispatchMatched) {
LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
}
} else if (ack.isIndividualAck()) {
// Message was delivered and acknowledge - but only delete the
// individual message
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getLastMessageId().equals(messageId)) {
// Don't remove the nodes until we are committed - immediateAck option
if (!context.isInTransaction()) {
getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node);
decrementPrefetchCounter(node);
contractPrefetchExtension(1);
} else {
registerRemoveSync(context, node);
expandPrefetchExtension(1);
}
acknowledge(context, ack, node);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
if (ack.getLastMessageId().equals(node.getMessageId())) {
expandPrefetchExtension(ack.getMessageCount());
destination = nodeDest;
callDispatchMatched = true;
break;
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isExpiredAck()) {
// Message was expired
int index = 0;
boolean inAckRange = false;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
if (broker.isExpired(node)) {
processExpiredAck(context, nodeDest, node);
}
iter.remove();
decrementPrefetchCounter(node);
if (ack.getLastMessageId().equals(messageId)) {
contractPrefetchExtension(1);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate expiration acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be
// a DLQ message.
boolean inAckRange = false;
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
if (ack.getLastMessageId().equals(messageId)) {
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isPoisonAck()) {
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a
// DLQ
if (ack.isInTransaction()) {
throw new JMSException("Poison ack cannot be transacted: "
+ ack);
}
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
sendToDLQ(context, node, ack.getPoisonCause());
Destination nodeDest = (Destination) node.getRegionDestination();
removeList.add(node);
getSubscriptionStatistics().getDequeues().increment();
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
contractPrefetchExtension(1);
destination = nodeDest;
callDispatchMatched = true;
break;
}
}
}
for (final MessageReference node : removeList) {
dispatched.remove(node);
decrementPrefetchCounter(node);
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
}
}
if (callDispatchMatched && destination != null) {
destination.wakeup();
dispatchPending();
if (pending.isEmpty()) {
wakeupDestinationsForDispatch();
}
} else {
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
}
}