internal static async Task StreamDataAsync()

in modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs [77:399]


    internal static async Task StreamDataAsync<TSource, TKey, TPayload, TArg, TResult>(
        IAsyncEnumerable<TSource> data,
        Table table,
        Func<TSource, TKey> keySelector,
        Func<TSource, TPayload> payloadSelector,
        IRecordSerializerHandler<TKey> keyWriter,
        DataStreamerOptions options,
        Channel<TResult>? resultChannel,
        IEnumerable<DeploymentUnit> units,
        string receiverClassName,
        TArg receiverArg,
        CancellationToken cancellationToken)
        where TKey : notnull
        where TPayload : notnull
    {
        IgniteArgumentCheck.NotNull(data);
        DataStreamer.ValidateOptions(options);

        // ConcurrentDictionary is not necessary because we consume the source sequentially.
        // However, locking for batches is required due to auto-flush background task.
        var batches = new Dictionary<int, Batch<TSource, TPayload>>();
        var failedItems = new ConcurrentQueue<TSource>();
        var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };
        var units0 = units as ICollection<DeploymentUnit> ?? units.ToList(); // Avoid multiple enumeration.

        var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);

        var partitionAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
        var partitionCount = partitionAssignment.Length; // Can't be changed.
        Debug.Assert(partitionCount > 0, "partitionCount > 0");

        Type? payloadType = null;
        using var autoFlushCts = new CancellationTokenSource();
        Task? autoFlushTask = null;
        Exception? error = null;

        try
        {
            autoFlushTask = AutoFlushAsync(autoFlushCts.Token);

            await foreach (var item in data.WithCancellation(cancellationToken))
            {
                // WithCancellation passes the token to the producer.
                // However, not all producers support cancellation, so we need to check it here as well.
                cancellationToken.ThrowIfCancellationRequested();

                var newAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
                if (newAssignment != partitionAssignment)
                {
                    // Drain all batches to preserve order when partition assignment changes.
                    await Drain().ConfigureAwait(false);
                    partitionAssignment = newAssignment;
                }

                var batch = Add(item);
                if (batch.Count >= options.PageSize)
                {
                    await SendAsync(batch).ConfigureAwait(false);
                }

                if (autoFlushTask.IsFaulted)
                {
                    await autoFlushTask.ConfigureAwait(false);
                }
            }

            await Drain().ConfigureAwait(false);
        }
        catch (Exception e)
        {
            error = e;
        }
        finally
        {
            await autoFlushCts.CancelAsync().ConfigureAwait(false);

            if (autoFlushTask is { })
            {
                try
                {
                    await autoFlushTask.ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    if (e is not OperationCanceledException)
                    {
                        error ??= e;
                    }
                }
            }

            foreach (var batch in batches.Values)
            {
                lock (batch)
                {
                    for (var i = 0; i < batch.Count; i++)
                    {
                        failedItems.Enqueue(batch.SourceItems[i]);
                    }

                    GetPool<TPayload>().Return(batch.Items);
                    GetPool<TSource>().Return(batch.SourceItems);

                    Metrics.StreamerItemsQueuedDecrement(batch.Count);
                    Metrics.StreamerBatchesActiveDecrement();
                }
            }

            if (error is { })
            {
                throw DataStreamerException.Create(error, failedItems);
            }

            if (!failedItems.IsEmpty)
            {
                // Should not happen.
                throw DataStreamerException.Create(new InvalidOperationException("Some items were not processed."), failedItems);
            }
        }

        return;

        Batch<TSource, TPayload> Add(TSource item)
        {
            var tupleBuilder = new BinaryTupleBuilder(schema.KeyColumns.Length, hashedColumnsPredicate: schema.HashedColumnIndexProvider);

            try
            {
                return Add0(item, ref tupleBuilder);
            }
            finally
            {
                tupleBuilder.Dispose();
            }
        }

        Batch<TSource, TPayload> Add0(TSource item, ref BinaryTupleBuilder tupleBuilder)
        {
            // Write key to compute hash.
            var key = keySelector(item);
            keyWriter.Write(ref tupleBuilder, key, schema, keyOnly: true, Span<byte>.Empty);

            var partitionId = Math.Abs(tupleBuilder.GetHash() % partitionCount);
            var batch = GetOrCreateBatch(partitionId);

            var payload = payloadSelector(item);
            IgniteArgumentCheck.NotNull(payload);

            if (payloadType == null)
            {
                payloadType = payload.GetType();
            }
            else if (payloadType != payload.GetType())
            {
                throw new InvalidOperationException(
                    $"All streamer items returned by payloadSelector must be of the same type. " +
                    $"Expected: {payloadType}, actual: {payload.GetType()}.");
            }

            lock (batch)
            {
                batch.Items[batch.Count] = payload;
                batch.SourceItems[batch.Count] = item;

                batch.Count++;
            }

            Metrics.StreamerItemsQueuedIncrement();

            return batch;
        }

        Batch<TSource, TPayload> GetOrCreateBatch(int partitionId)
        {
            ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);

            if (batchRef == null)
            {
                batchRef = new Batch<TSource, TPayload>(options.PageSize, partitionId);
                Metrics.StreamerBatchesActiveIncrement();
            }

            return batchRef;
        }

        async Task SendAsync(Batch<TSource, TPayload> batch)
        {
            var expectedSize = batch.Count;

            // Wait for the previous task for this batch to finish: preserve order, backpressure control.
            await batch.Task.ConfigureAwait(false);

            lock (batch)
            {
                if (batch.Count != expectedSize || batch.Count == 0)
                {
                    // Concurrent update happened.
                    return;
                }

                batch.Task = SendAndDisposeBufAsync(batch.PartitionId, batch.Task, batch.Items, batch.SourceItems, batch.Count);

                batch.Items = GetPool<TPayload>().Rent(options.PageSize);
                batch.SourceItems = GetPool<TSource>().Rent(options.PageSize);
                batch.Count = 0;
                batch.LastFlush = Stopwatch.GetTimestamp();

                Metrics.StreamerBatchesActiveIncrement();
            }
        }

        async Task SendAndDisposeBufAsync(
            int partitionId,
            Task oldTask,
            TPayload[] items,
            TSource[] sourceItems,
            int count)
        {
            // Release the thread that holds the batch lock.
            await Task.Yield();

            var buf = ProtoCommon.GetMessageWriter();
            TResult[]? results = null;

            try
            {
                SerializeBatch(buf, items.AsSpan(0, count), partitionId);

                // ReSharper disable once AccessToModifiedClosure
                var preferredNode = PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);

                // Wait for the previous batch for this node to preserve item order.
                await oldTask.ConfigureAwait(false);
                (results, int resultsCount) = await SendBatchAsync<TResult>(
                    table, buf, count, preferredNode, retryPolicy, expectResults: resultChannel != null).ConfigureAwait(false);

                if (results != null && resultChannel != null)
                {
                    for (var i = 0; i < resultsCount; i++)
                    {
                        TResult result = results[i];
                        await resultChannel.Writer.WriteAsync(result, cancellationToken).ConfigureAwait(false);
                    }
                }
            }
            catch (ChannelClosedException)
            {
                // Consumer does not want more results, stop returning them, but keep streaming.
                resultChannel = null;
            }
            catch (Exception)
            {
                for (var i = 0; i < count; i++)
                {
                    failedItems.Enqueue(sourceItems[i]);
                }

                throw;
            }
            finally
            {
                buf.Dispose();
                GetPool<TPayload>().Return(items);
                GetPool<TSource>().Return(sourceItems);

                if (results != null)
                {
                    GetPool<TResult>().Return(results);
                }

                Metrics.StreamerItemsQueuedDecrement(count);
                Metrics.StreamerBatchesActiveDecrement();
            }
        }

        async Task AutoFlushAsync(CancellationToken flushCt)
        {
            while (!flushCt.IsCancellationRequested)
            {
                await Task.Delay(options.AutoFlushInterval, flushCt).ConfigureAwait(false);
                var ts = Stopwatch.GetTimestamp();

                foreach (var batch in batches.Values)
                {
                    if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushInterval.Ticks)
                    {
                        await SendAsync(batch).ConfigureAwait(false);
                    }
                }
            }
        }

        async Task Drain()
        {
            foreach (var batch in batches.Values)
            {
                if (batch.Count > 0)
                {
                    await SendAsync(batch).ConfigureAwait(false);
                }

                await batch.Task.ConfigureAwait(false);
            }
        }

        void SerializeBatch<T>(
            PooledArrayBuffer buf,
            Span<T> items,
            int partitionId)
        {
            // T is one of the supported types (numbers, strings, etc).
            var w = buf.MessageWriter;

            w.Write(table.Id);
            w.Write(partitionId);

            Compute.WriteUnits(units0, buf);

            var expectResults = resultChannel != null;
            w.Write(expectResults);
            WriteReceiverPayload(ref w, receiverClassName, receiverArg, items);
        }
    }