void ActiveMQConsumerKernel::afterMessageIsConsumed()

in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [1248:1319]


void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> message, bool messageExpired ) {

    try {

        if (this->internal->unconsumedMessages->isClosed()) {
            return;
        } else if (messageExpired) {
            acknowledge(message, ActiveMQConstants::ACK_TYPE_EXPIRED);
            return;
        } else if (session->isTransacted()) {
            return;
        }

        if (isAutoAcknowledgeEach()) {
            if (this->internal->deliveringAcks.compareAndSet(false, true)) {
                synchronized(&this->internal->deliveredMessages) {
                    if (!this->internal->deliveredMessages.isEmpty()) {
                        if (this->internal->optimizeAcknowledge) {

                            this->internal->ackCounter++;
                            if (this->internal->isTimeForOptimizedAck(this->consumerInfo->getPrefetchSize())) {
                                Pointer<MessageAck> ack =
                                    makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                                if (ack != NULL) {
                                    this->internal->deliveredMessages.clear();
                                    this->internal->ackCounter = 0;
                                    this->session->sendAck(ack);
                                    this->internal->optimizeAckTimestamp = System::currentTimeMillis();
                                }

                                // As further optimization send ack for expired messages when there
                                // are any. This resets the deliveredCounter to 0 so that we won't
                                // send standard acks with every message just because the deliveredCounter
                                // just below 0.5 * prefetch as used in ackLater()
                                if (this->internal->pendingAck != NULL && this->internal->deliveredCounter > 0) {
                                    this->session->sendAck(this->internal->pendingAck);
                                    this->internal->pendingAck.reset(NULL);
                                    this->internal->deliveredCounter = 0;
                                }
                            }
                        } else {
                            Pointer<MessageAck> ack =
                                makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                            if (ack != NULL) {
                                this->internal->deliveredMessages.clear();
                                session->sendAck(ack);
                            }
                        }
                    }
                }

                this->internal->deliveringAcks.set(false);
            }
        } else if (isAutoAcknowledgeBatch()) {
            ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
        } else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
            bool messageUnackedByConsumer = false;
            synchronized(&this->internal->deliveredMessages) {
                messageUnackedByConsumer = this->internal->deliveredMessages.contains(message);
            }

            if (messageUnackedByConsumer) {
                this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
            }
        } else {
            throw IllegalStateException(__FILE__, __LINE__, "Invalid Session State");
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}