in src/Microsoft.Azure.SignalR/ClientConnections/ClientConnectionContext.cs [291:403]
internal async Task ProcessOutgoingMessagesAsync(SignalRProtocol.IHubProtocol protocol)
{
try
{
while (true)
{
var result = await Application.Input.ReadAsync(OutgoingAborted);
if (result.IsCanceled)
{
break;
}
var buffer = result.Buffer;
if (!buffer.IsEmpty)
{
if (!HandshakeResponseTask.IsCompleted)
{
var next = buffer;
if (SignalRProtocol.HandshakeProtocol.TryParseResponseMessage(ref next, out var message))
{
if (_isMigrated)
{
// simply skip the handshake response.
buffer = buffer.Slice(next.Start);
}
else
{
var dataMessage = new ConnectionDataMessage(ConnectionId, buffer.Slice(0, next.Start))
{
Type = DataMessageType.Handshake
};
var forwardResult = await ForwardMessage(dataMessage);
buffer = forwardResult switch
{
ForwardMessageResult.Success => buffer.Slice(next.Start),
_ => throw new ForwardMessageException(forwardResult),
};
}
_hanshakeCompleteTcs.TrySetResult(null);
}
}
if (HandshakeResponseTask.IsCompleted)
{
var next = buffer;
while (!buffer.IsEmpty && protocol.TryParseMessage(ref next, FakeInvocationBinder.Instance, out var message))
{
// we still want messages to successfully going out when application completes
if (!await _pauseHandler.WaitAsync(StaticRandom.Next(500, 1500), OutgoingAborted))
{
Log.OutgoingTaskPaused(Logger, ConnectionId);
buffer = buffer.Slice(0);
break;
}
try
{
var messageType = message switch
{
SignalRProtocol.HubInvocationMessage => DataMessageType.Invocation,
SignalRProtocol.CloseMessage => DataMessageType.Close,
_ => DataMessageType.Other,
};
var dataMessage = new ConnectionDataMessage(ConnectionId, buffer.Slice(0, next.Start))
{
Type = messageType
};
var forwardResult = await ForwardMessage(dataMessage);
buffer = forwardResult switch
{
ForwardMessageResult.Fatal => throw new ForwardMessageException(forwardResult),
_ => next,
};
}
finally
{
_pauseHandler.Release();
}
}
}
}
if (result.IsCompleted)
{
// This connection ended (the application itself shut down) we should remove it from the list of connections
break;
}
Application.Input.AdvanceTo(buffer.Start, buffer.End);
}
}
catch (OperationCanceledException)
{
// cancelled
}
catch (ForwardMessageException)
{
// do nothing.
}
catch (Exception ex)
{
// The exception means application fail to process input anymore
// Cancel any pending flush so that we can quit and perform disconnect
// Here is abort close and WaitOnApplicationTask will send close message to notify client to disconnect
Log.SendLoopStopped(Logger, ConnectionId, ex);
Application.Output.CancelPendingFlush();
}
finally
{
Application.Input.Complete();
}
}