internal async Task ProcessOutgoingMessagesAsync()

in src/Microsoft.Azure.SignalR/ClientConnections/ClientConnectionContext.cs [291:403]


    internal async Task ProcessOutgoingMessagesAsync(SignalRProtocol.IHubProtocol protocol)
    {
        try
        {
            while (true)
            {
                var result = await Application.Input.ReadAsync(OutgoingAborted);

                if (result.IsCanceled)
                {
                    break;
                }

                var buffer = result.Buffer;

                if (!buffer.IsEmpty)
                {
                    if (!HandshakeResponseTask.IsCompleted)
                    {
                        var next = buffer;
                        if (SignalRProtocol.HandshakeProtocol.TryParseResponseMessage(ref next, out var message))
                        {
                            if (_isMigrated)
                            {
                                // simply skip the handshake response.
                                buffer = buffer.Slice(next.Start);
                            }
                            else
                            {
                                var dataMessage = new ConnectionDataMessage(ConnectionId, buffer.Slice(0, next.Start))
                                {
                                    Type = DataMessageType.Handshake
                                };
                                var forwardResult = await ForwardMessage(dataMessage);
                                buffer = forwardResult switch
                                {
                                    ForwardMessageResult.Success => buffer.Slice(next.Start),
                                    _ => throw new ForwardMessageException(forwardResult),
                                };
                            }
                            _hanshakeCompleteTcs.TrySetResult(null);
                        }
                    }
                    if (HandshakeResponseTask.IsCompleted)
                    {
                        var next = buffer;
                        while (!buffer.IsEmpty && protocol.TryParseMessage(ref next, FakeInvocationBinder.Instance, out var message))
                        {
                            // we still want messages to successfully going out when application completes
                            if (!await _pauseHandler.WaitAsync(StaticRandom.Next(500, 1500), OutgoingAborted))
                            {
                                Log.OutgoingTaskPaused(Logger, ConnectionId);
                                buffer = buffer.Slice(0);
                                break;
                            }

                            try
                            {
                                var messageType = message switch
                                {
                                    SignalRProtocol.HubInvocationMessage => DataMessageType.Invocation,
                                    SignalRProtocol.CloseMessage => DataMessageType.Close,
                                    _ => DataMessageType.Other,
                                };
                                var dataMessage = new ConnectionDataMessage(ConnectionId, buffer.Slice(0, next.Start))
                                {
                                    Type = messageType
                                };
                                var forwardResult = await ForwardMessage(dataMessage);
                                buffer = forwardResult switch
                                {
                                    ForwardMessageResult.Fatal => throw new ForwardMessageException(forwardResult),
                                    _ => next,
                                };
                            }
                            finally
                            {
                                _pauseHandler.Release();
                            }
                        }
                    }
                }

                if (result.IsCompleted)
                {
                    // This connection ended (the application itself shut down) we should remove it from the list of connections
                    break;
                }

                Application.Input.AdvanceTo(buffer.Start, buffer.End);
            }
        }
        catch (OperationCanceledException)
        {
            // cancelled
        }
        catch (ForwardMessageException)
        {
            // do nothing.
        }
        catch (Exception ex)
        {
            // The exception means application fail to process input anymore
            // Cancel any pending flush so that we can quit and perform disconnect
            // Here is abort close and WaitOnApplicationTask will send close message to notify client to disconnect
            Log.SendLoopStopped(Logger, ConnectionId, ex);
            Application.Output.CancelPendingFlush();
        }
        finally
        {
            Application.Input.Complete();
        }
    }