private async Task DispatchMessagesAsync()

in src/Microsoft.Azure.SignalR.Emulator/HubEmulator/HubProxyHandler.cs [252:316]


        private async Task DispatchMessagesAsync(HubConnectionContext connection)
        {
            var input = GetInput(connection);
            var protocol = connection.Protocol;

            InvokeBeginClientTimeout(connection);
            
            var binder = InvocationBinder.Instance;

            while (true)
            {
                var result = await input.ReadAsync();
                var buffer = result.Buffer;

                try
                {
                    if (result.IsCanceled)
                    {
                        break;
                    }

                    if (!buffer.IsEmpty)
                    {
                        bool messageReceived = false;

                        // No message limit, just parse and dispatch
                        while (TryParse(protocol, binder, ref buffer, out var message))
                        {
                            InvokeStopClientTimeout(connection);
                            messageReceived = true;
                            if (message is InvocationMessage invocationMessage && TryGetPayload(protocol, invocationMessage, out var payload))
                            {
                                var invocation = new ServerlessProtocol.InvocationMessage(payload, invocationMessage.Target, invocationMessage.InvocationId);
                                var response = await ((IUpstreamMessageHandler)_upstream).WriteMessageAsync(connection, invocation);
                                if (response.Length > 0)
                                {
                                    await ((EmulatorHubConnectionContext)connection).ConnectionContext.Transport.Output.WriteAsync(response.First);
                                }
                            }
                        }

                        if (messageReceived)
                        {
                            InvokeBeginClientTimeout(connection);
                        }
                    }

                    if (result.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Connection terminated while reading a message.");
                        }
                        break;
                    }
                }
                finally
                {
                    // The buffer was sliced up to where it was consumed, so we can just advance to the start.
                    // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
                    // before yielding the read again.
                    input.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }