public void Dispatch()

in src/main/csharp/MessageConsumer.cs [436:513]


        public void Dispatch(MessageDispatch dispatch)
        {
            MessageListener listener = this.listener;

            try
            {
                lock(this.unconsumedMessages.SyncRoot)
                {
                    if(this.clearDispatchList)
                    {
                        // we are reconnecting so lets flush the in progress messages
                        this.clearDispatchList = false;
                        this.unconsumedMessages.Clear();

                        if(this.pendingAck != null)
                        {
                            // on resumption a pending delivered ack will be out of sync with
                            // re-deliveries.
                            if(Tracer.IsDebugEnabled)
                            {
                                Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
                            }
                            this.pendingAck = null;
                        }
                    }

                    if(!this.unconsumedMessages.Closed)
                    {
                        if(listener != null && this.unconsumedMessages.Running)
                        {
                            Message message = CreateStompMessage(dispatch);

                            this.BeforeMessageIsConsumed(dispatch);

                            try
                            {
								bool expired = (!IgnoreExpiration && message.IsExpired());

                                if(!expired)
                                {
                                    listener(message);
                                }

                                this.AfterMessageIsConsumed(dispatch, expired);
                            }
                            catch(Exception e)
                            {
                                if(this.session.IsAutoAcknowledge || this.session.IsIndividualAcknowledge)
                                {
                                    // Redeliver the message
                                }
                                else
                                {
                                    // Transacted or Client ack: Deliver the next message.
                                    this.AfterMessageIsConsumed(dispatch, false);
                                }

                                Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
                            }
                        }
                        else
                        {
                            this.unconsumedMessages.Enqueue(dispatch);
                        }
                    }
                }

                if(++dispatchedCount % 1000 == 0)
                {
                    dispatchedCount = 0;
                    Thread.Sleep((int)1);
                }
            }
            catch(Exception e)
            {
                this.session.Connection.OnSessionException(this.session, e);
            }
        }