in src/DotPulsar/Internal/Connection.cs [300:326]
public async Task ProcessIncomingFrames(CancellationToken cancellationToken)
{
await Task.Yield();
try
{
await foreach (var frame in _stream.Frames(cancellationToken))
{
var commandSize = frame.ReadUInt32(0, true);
var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
if (_pingPongHandler.Incoming(command.CommandType))
continue;
if (command.CommandType == BaseCommand.Type.Message)
_channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
else if (command.CommandType == BaseCommand.Type.AuthChallenge)
_ = Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false);
else
_channelManager.Incoming(command);
}
}
catch
{
// ignored
}
}