public async ValueTask GetNextValuesAsync()

in src/StreamJsonRpc/Reflection/MessageFormatterEnumerableTracker.cs [230:301]


            public async ValueTask<object> GetNextValuesAsync(CancellationToken cancellationToken)
            {
                try
                {
                    using (cancellationToken.Register(state => ((CancellationTokenSource)state!).Cancel(), this.cancellationTokenSource))
                    {
                        cancellationToken = this.cancellationTokenSource.Token;
                        bool finished = false;
                        var results = new List<T>(this.Settings.MinBatchSize);
                        if (this.readAheadElements is not null)
                        {
                            // Fetch at least the min batch size and at most the number that has been cached up to this point (or until we hit the end of the sequence).
                            // We snap the number of cached elements up front because as we dequeue, we create capacity to store more and we don't want to
                            // collect and return more than MaxReadAhead.
                            int cachedOnEntry = this.readAheadElements.Count;
                            for (int i = 0; !this.readAheadElements.Completion.IsCompleted && (i < this.Settings.MinBatchSize || (cachedOnEntry - results.Count > 0)); i++)
                            {
                                try
                                {
                                    T element = await this.readAheadElements.ReceiveAsync(cancellationToken).ConfigureAwait(false);
                                    results.Add(element);
                                }
                                catch (InvalidOperationException) when (this.readAheadElements.Completion.IsCompleted)
                                {
                                    // Race condition. The sequence is over.
                                    finished = true;
                                    break;
                                }
                            }

                            if (this.readAheadElements.Completion.IsCompleted)
                            {
                                // Rethrow any exceptions.
                                await this.readAheadElements.Completion.ConfigureAwait(false);
                                finished = true;
                            }
                        }
                        else
                        {
                            for (int i = 0; i < this.Settings.MinBatchSize; i++)
                            {
                                if (!await this.enumerator.MoveNextAsync().ConfigureAwait(false))
                                {
                                    finished = true;
                                    break;
                                }

                                results.Add(this.enumerator.Current);
                            }
                        }

                        if (finished)
                        {
                            // Clean up all resources since we don't expect the client to send a dispose notification
                            // since finishing the enumeration implicitly should dispose of it.
                            await this.tracker.OnDisposeAsync(this.token).ConfigureAwait(false);
                        }

                        return new EnumeratorResults<T>
                        {
                            Finished = finished,
                            Values = results,
                        };
                    }
                }
                catch
                {
                    // An error is considered fatal to the enumerable, so clean up everything.
                    await this.tracker.OnDisposeAsync(this.token).ConfigureAwait(false);
                    throw;
                }
            }