protected void DoRestoreConsumers()

in src/State/ConnectionStateTracker.cs [238:282]


        protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
        {
            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
            // recovery completes.

            ConnectionState connectionState = null;
            bool connectionInterruptionProcessingComplete = false;

            if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
            {
                connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
            }

            // Restore the session's consumers
            foreach(ConsumerState consumerState in sessionState.ConsumerStates)
            {
                ConsumerInfo infoToSend = consumerState.Info;

                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                {
                    infoToSend = consumerState.Info.Clone() as ConsumerInfo;
                    lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
                    {
                        if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
                        {
                            connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
                        }
                    }
                    infoToSend.PrefetchSize = 0;
                    if(Tracer.IsDebugEnabled)
                    {
                        Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
                                     " in pull mode pending recovery, overriding prefetch: " +
                                     consumerState.Info.PrefetchSize);
                    }
                }

                if(Tracer.IsDebugEnabled)
                {
                    Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
                }

                transport.Oneway(infoToSend);
            }
        }