in src/DotPulsar/Internal/Connection.cs [351:380]
private async Task ProcessIncomingFrames(CancellationToken cancellationToken)
{
await Task.Yield();
try
{
await foreach (var frame in _stream.Frames(cancellationToken).ConfigureAwait(false))
{
var commandSize = frame.ReadUInt32(0, true);
var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
_pingPongHandler.Incoming(command.CommandType);
if (command.CommandType == BaseCommand.Type.Message)
_channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
else if (command.CommandType == BaseCommand.Type.AuthChallenge)
_ = Task.Factory.StartNew(async () => await Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false));
else if (command.CommandType == BaseCommand.Type.Ping)
_ = Task.Factory.StartNew(async () => await Send(new CommandPong(), cancellationToken).ConfigureAwait(false));
else if (command.CommandType == BaseCommand.Type.Pong)
continue;
else
_channelManager.Incoming(command);
}
}
catch
{
// ignored
}
}