in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [1248:1319]
void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> message, bool messageExpired ) {
try {
if (this->internal->unconsumedMessages->isClosed()) {
return;
} else if (messageExpired) {
acknowledge(message, ActiveMQConstants::ACK_TYPE_EXPIRED);
return;
} else if (session->isTransacted()) {
return;
}
if (isAutoAcknowledgeEach()) {
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
synchronized(&this->internal->deliveredMessages) {
if (!this->internal->deliveredMessages.isEmpty()) {
if (this->internal->optimizeAcknowledge) {
this->internal->ackCounter++;
if (this->internal->isTimeForOptimizedAck(this->consumerInfo->getPrefetchSize())) {
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
this->internal->deliveredMessages.clear();
this->internal->ackCounter = 0;
this->session->sendAck(ack);
this->internal->optimizeAckTimestamp = System::currentTimeMillis();
}
// As further optimization send ack for expired messages when there
// are any. This resets the deliveredCounter to 0 so that we won't
// send standard acks with every message just because the deliveredCounter
// just below 0.5 * prefetch as used in ackLater()
if (this->internal->pendingAck != NULL && this->internal->deliveredCounter > 0) {
this->session->sendAck(this->internal->pendingAck);
this->internal->pendingAck.reset(NULL);
this->internal->deliveredCounter = 0;
}
}
} else {
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
this->internal->deliveredMessages.clear();
session->sendAck(ack);
}
}
}
}
this->internal->deliveringAcks.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
} else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
bool messageUnackedByConsumer = false;
synchronized(&this->internal->deliveredMessages) {
messageUnackedByConsumer = this->internal->deliveredMessages.contains(message);
}
if (messageUnackedByConsumer) {
this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
}
} else {
throw IllegalStateException(__FILE__, __LINE__, "Invalid Session State");
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}