private async Task ProcessIncomingFrames()

in src/DotPulsar/Internal/Connection.cs [351:380]


    private async Task ProcessIncomingFrames(CancellationToken cancellationToken)
    {
        await Task.Yield();

        try
        {
            await foreach (var frame in _stream.Frames(cancellationToken).ConfigureAwait(false))
            {
                var commandSize = frame.ReadUInt32(0, true);
                var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));

                _pingPongHandler.Incoming(command.CommandType);

                if (command.CommandType == BaseCommand.Type.Message)
                    _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
                else if (command.CommandType == BaseCommand.Type.AuthChallenge)
                    _ = Task.Factory.StartNew(async () => await Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false));
                else if (command.CommandType == BaseCommand.Type.Ping)
                    _ = Task.Factory.StartNew(async () => await Send(new CommandPong(), cancellationToken).ConfigureAwait(false));
                else if (command.CommandType == BaseCommand.Type.Pong)
                    continue;
                else
                    _channelManager.Incoming(command);
            }
        }
        catch
        {
            // ignored
        }
    }