in src/State/ConnectionStateTracker.cs [238:282]
protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
{
// Restore the session's consumers but possibly in pull only (prefetch 0 state) till
// recovery completes.
ConnectionState connectionState = null;
bool connectionInterruptionProcessingComplete = false;
if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
{
connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
}
// Restore the session's consumers
foreach(ConsumerState consumerState in sessionState.ConsumerStates)
{
ConsumerInfo infoToSend = consumerState.Info;
if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
{
infoToSend = consumerState.Info.Clone() as ConsumerInfo;
lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
{
if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
{
connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
}
}
infoToSend.PrefetchSize = 0;
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
" in pull mode pending recovery, overriding prefetch: " +
consumerState.Info.PrefetchSize);
}
}
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
}
transport.Oneway(infoToSend);
}
}