Elastic.SemanticKernel.Connectors.Elasticsearch/Internal/Linq/AsyncEnumerable.cs (115 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using Microsoft.SemanticKernel; // Used for compatibility with System.Linq.Async Nuget pkg namespace System.Linq; [ExcludeFromCodeCoverage] internal static class AsyncEnumerable { public static IAsyncEnumerable<T> Empty<T>() => EmptyAsyncEnumerable<T>.Instance; #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default) { var enumerator = source.GetAsyncEnumerator(cancellationToken); try { while (enumerator.MoveNextAsync().AsTask().GetAwaiter().GetResult()) { yield return enumerator.Current; } } finally { enumerator.DisposeAsync().AsTask().GetAwaiter().GetResult(); } } #pragma warning restore VSTHRD002 // Avoid problematic synchronous waits #pragma warning disable IDE1006 // Naming rule violation: Missing suffix: 'Async' #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<T> source) { foreach (var item in source) { yield return item; } } #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously #pragma warning restore IDE1006 // Naming rule violation: Missing suffix: 'Async' public static async ValueTask<T?> FirstOrDefaultAsync<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default) { await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { return item; } return default; } public static async ValueTask<T?> LastOrDefaultAsync<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default) { var last = default(T)!; // NB: Only matters when hasLast is set to true. var hasLast = false; await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { hasLast = true; last = item; } return hasLast ? last! : default; } public static async ValueTask<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default) { var result = new List<T>(); await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { result.Add(item); } return result; } public static async ValueTask<bool> ContainsAsync<T>(this IAsyncEnumerable<T> source, T value) { await foreach (var item in source.ConfigureAwait(false)) { if (EqualityComparer<T>.Default.Equals(item, value)) { return true; } } return false; } public static async ValueTask<int> CountAsync<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default) { var count = 0; await foreach (var _ in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { checked { count++; } } return count; } /// <summary> /// Determines whether any element of an async-enumerable sequence satisfies a condition. /// </summary> /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> /// <param name="source">An async-enumerable sequence whose elements to apply the predicate to.</param> /// <param name="predicate">A function to test each element for a condition.</param> /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param> /// <returns>An async-enumerable sequence containing a single element determining whether any elements in the source sequence pass the test in the specified predicate.</returns> /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception> /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks> public static ValueTask<bool> AnyAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken = default) { Verify.NotNull(source); Verify.NotNull(predicate); return Core(source, predicate, cancellationToken); static async ValueTask<bool> Core(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken) { await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { if (predicate(item)) { return true; } } return false; } } private sealed class EmptyAsyncEnumerable<T> : IAsyncEnumerable<T>, IAsyncEnumerator<T> { public static readonly EmptyAsyncEnumerable<T> Instance = new(); public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) => this; public ValueTask<bool> MoveNextAsync() => new(false); public T Current => default!; public ValueTask DisposeAsync() => default; } }