in src/StreamJsonRpc/Reflection/MessageFormatterEnumerableTracker.cs [230:301]
public async ValueTask<object> GetNextValuesAsync(CancellationToken cancellationToken)
{
try
{
using (cancellationToken.Register(state => ((CancellationTokenSource)state!).Cancel(), this.cancellationTokenSource))
{
cancellationToken = this.cancellationTokenSource.Token;
bool finished = false;
var results = new List<T>(this.Settings.MinBatchSize);
if (this.readAheadElements is not null)
{
// Fetch at least the min batch size and at most the number that has been cached up to this point (or until we hit the end of the sequence).
// We snap the number of cached elements up front because as we dequeue, we create capacity to store more and we don't want to
// collect and return more than MaxReadAhead.
int cachedOnEntry = this.readAheadElements.Count;
for (int i = 0; !this.readAheadElements.Completion.IsCompleted && (i < this.Settings.MinBatchSize || (cachedOnEntry - results.Count > 0)); i++)
{
try
{
T element = await this.readAheadElements.ReceiveAsync(cancellationToken).ConfigureAwait(false);
results.Add(element);
}
catch (InvalidOperationException) when (this.readAheadElements.Completion.IsCompleted)
{
// Race condition. The sequence is over.
finished = true;
break;
}
}
if (this.readAheadElements.Completion.IsCompleted)
{
// Rethrow any exceptions.
await this.readAheadElements.Completion.ConfigureAwait(false);
finished = true;
}
}
else
{
for (int i = 0; i < this.Settings.MinBatchSize; i++)
{
if (!await this.enumerator.MoveNextAsync().ConfigureAwait(false))
{
finished = true;
break;
}
results.Add(this.enumerator.Current);
}
}
if (finished)
{
// Clean up all resources since we don't expect the client to send a dispose notification
// since finishing the enumeration implicitly should dispose of it.
await this.tracker.OnDisposeAsync(this.token).ConfigureAwait(false);
}
return new EnumeratorResults<T>
{
Finished = finished,
Values = results,
};
}
}
catch
{
// An error is considered fatal to the enumerable, so clean up everything.
await this.tracker.OnDisposeAsync(this.token).ConfigureAwait(false);
throw;
}
}