in src/Microsoft.Azure.SignalR.Emulator/HubEmulator/HubProxyHandler.cs [252:316]
private async Task DispatchMessagesAsync(HubConnectionContext connection)
{
var input = GetInput(connection);
var protocol = connection.Protocol;
InvokeBeginClientTimeout(connection);
var binder = InvocationBinder.Instance;
while (true)
{
var result = await input.ReadAsync();
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
bool messageReceived = false;
// No message limit, just parse and dispatch
while (TryParse(protocol, binder, ref buffer, out var message))
{
InvokeStopClientTimeout(connection);
messageReceived = true;
if (message is InvocationMessage invocationMessage && TryGetPayload(protocol, invocationMessage, out var payload))
{
var invocation = new ServerlessProtocol.InvocationMessage(payload, invocationMessage.Target, invocationMessage.InvocationId);
var response = await ((IUpstreamMessageHandler)_upstream).WriteMessageAsync(connection, invocation);
if (response.Length > 0)
{
await ((EmulatorHubConnectionContext)connection).ConnectionContext.Transport.Output.WriteAsync(response.First);
}
}
}
if (messageReceived)
{
InvokeBeginClientTimeout(connection);
}
}
if (result.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Connection terminated while reading a message.");
}
break;
}
}
finally
{
// The buffer was sliced up to where it was consumed, so we can just advance to the start.
// We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again.
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}