in src/DotPulsar/Internal/PulsarStream.cs [75:112]
private async Task FillPipe(CancellationToken cancellationToken)
{
await Task.Yield();
try
{
#if NETSTANDARD2_0
var buffer = new byte[84999];
#endif
while (true)
{
var memory = _writer.GetMemory(84999);
#if NETSTANDARD2_0
var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
#else
var bytesRead = await _stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false);
#endif
if (bytesRead == 0)
break;
_writer.Advance(bytesRead);
var result = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
break;
}
}
catch
{
// ignored
}
finally
{
await _writer.CompleteAsync().ConfigureAwait(false);
}
}