public void run()

in activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java [884:1105]


    public void run() {
        MessageDispatch messageDispatch;
        while ((messageDispatch = executor.dequeueNoWait()) != null) {
            final MessageDispatch md = messageDispatch;

            // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
            if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
                ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
            }
            if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
                ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
                ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
            }

            MessageAck earlyAck = null;
            if (message.isExpired()) {
                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
                earlyAck.setFirstMessageId(message.getMessageId());
            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
                earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
            }
            if (earlyAck != null) {
                try {
                    asyncSendPacket(earlyAck);
                } catch (Throwable t) {
                    LOG.error("error dispatching ack: {} ", earlyAck, t);
                    connection.onClientInternalException(t);
                } finally {
                    continue;
                }
            }

            if (isClientAcknowledge()||isIndividualAcknowledge()) {
                message.setAcknowledgeCallback(new Callback() {
                    @Override
                    public void execute() throws Exception {
                    }
                });
            }

            if (deliveryListener != null) {
                deliveryListener.beforeDelivery(this, message);
            }

            md.setDeliverySequenceId(getNextDeliveryId());
            lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();

            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);

            final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
            /*
            * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
            * We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
            * */
            synchronized (redeliveryGuard) {
                try {
                    ack.setFirstMessageId(md.getMessage().getMessageId());
                    doStartTransaction();
                    ack.setTransactionId(getTransactionContext().getTransactionId());
                    if (ack.getTransactionId() != null) {
                        getTransactionContext().addSynchronization(new Synchronization() {

                            final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());

                            @Override
                            public void beforeEnd() throws Exception {
                                // validate our consumer so we don't push stale acks that get ignored
                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
                                    LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
                                    throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
                                }
                                LOG.trace("beforeEnd ack {}", ack);
                                sendAck(ack);
                            }

                            @Override
                            public void afterRollback() throws Exception {
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("afterRollback {}", ack, new Throwable("here"));
                                }
                                // ensure we don't filter this as a duplicate
                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());

                                // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
                                if (clearRequestsCounter.get() > clearRequestCount) {
                                    LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
                                    return;
                                }

                                // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
                                    LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
                                    return;
                                }

                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
                                        && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
                                    // We need to NACK the messages so that they get
                                    // sent to the
                                    // DLQ.
                                    // Acknowledge the last message.
                                    MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
                                    ack.setFirstMessageId(md.getMessage().getMessageId());
                                    ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
                                    LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
                                    asyncSendPacket(ack);

                                } else {

                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
                                    ack.setFirstMessageId(md.getMessage().getMessageId());
                                    asyncSendPacket(ack);

                                    // Figure out how long we should wait to resend
                                    // this message.
                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                                    for (int i = 0; i < redeliveryCounter; i++) {
                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
                                    }

                                    /*
                                    * If we are a non blocking delivery then we need to stop the executor to avoid more
                                    * messages being delivered, once the message is redelivered we can restart it.
                                    * */
                                    if (!connection.isNonBlockingRedelivery()) {
                                        LOG.debug("Blocking session until re-delivery...");
                                        executor.stop();
                                    }

                                    connection.getScheduler().executeAfterDelay(new Runnable() {

                                        @Override
                                        public void run() {
                                            /*
                                            * wait for the first delivery to be complete, i.e. after delivery has been called.
                                            * */
                                            synchronized (redeliveryGuard) {
                                                /*
                                                * If its non blocking then we can just dispatch in a new session.
                                                * */
                                                if (connection.isNonBlockingRedelivery()) {
                                                    ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
                                                } else {
                                                    /*
                                                    * If there has been an error thrown during afterDelivery then the
                                                    * endpoint will be marked as dead so redelivery will fail (and eventually
                                                    * the session marked as stale), in this case we can only call dispatch
                                                    * which will create a new session with a new endpoint.
                                                    * */
                                                    if (afterDeliveryError.get()) {
                                                        ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
                                                    } else {
                                                        executor.executeFirst(md);
                                                        executor.start();
                                                    }
                                                }
                                            }
                                        }
                                    }, redeliveryDelay);
                                }
                                md.getMessage().onMessageRolledBack();
                            }
                        });
                    }

                    LOG.trace("{} onMessage({})", this, message.getMessageId());
                    messageListener.onMessage(message);

                } catch (Throwable e) {
                    if (!isClosed()) {
                        LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e);
                    }

                    if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) {
                        LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
                        getTransactionContext().setRollbackOnly(true);
                    }

                    // A problem while invoking the MessageListener does not
                    // in general indicate a problem with the connection to the broker, i.e.
                    // it will usually be sufficient to let the afterDelivery() method either
                    // commit or roll back in order to deal with the exception.
                    // However, we notify any registered client internal exception listener
                    // of the problem.
                    connection.onClientInternalException(e);
                } finally {
                    if (ack.getTransactionId() == null) {
                        try {
                            asyncSendPacket(ack);
                        } catch (Throwable e) {
                            connection.onClientInternalException(e);
                        }
                    }
                }

                if (deliveryListener != null) {
                    try {
                        deliveryListener.afterDelivery(this, message);
                    } catch (Throwable t) {
                        LOG.debug("Unable to call after delivery", t);
                        afterDeliveryError.set(true);
                        throw new RuntimeException(t);
                    }
                }
            }
            /*
            * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
            * It also needs to be outside the redelivery guard.
            * */
            try {
                executor.waitForQueueRestart();
            } catch (InterruptedException ex) {
                connection.onClientInternalException(ex);
            }
        }
    }