internal static async Task StreamDataAsync()

in modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs [62:286]


    internal static async Task StreamDataAsync<T>(
        IAsyncEnumerable<T> data,
        Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
        IRecordSerializerHandler<T> writer,
        Func<Task<Schema>> schemaProvider,
        Func<ValueTask<string[]?>> partitionAssignmentProvider,
        DataStreamerOptions options,
        CancellationToken cancellationToken)
    {
        IgniteArgumentCheck.NotNull(data);

        IgniteArgumentCheck.Ensure(options.BatchSize > 0, $"{nameof(options.BatchSize)} should be positive.");
        IgniteArgumentCheck.Ensure(options.AutoFlushFrequency > TimeSpan.Zero, $"{nameof(options.AutoFlushFrequency)} should be positive.");
        IgniteArgumentCheck.Ensure(options.RetryLimit >= 0, $"{nameof(options.RetryLimit)} should be non-negative.");

        // ConcurrentDictionary is not necessary because we consume the source sequentially.
        // However, locking for batches is required due to auto-flush background task.
        var batches = new Dictionary<string, Batch>();
        var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };

        // TODO: IGNITE-19710 Data Streamer schema synchronization
        var schema = await schemaProvider().ConfigureAwait(false);
        var partitionAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
        var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
        using var flushCts = new CancellationTokenSource();

        try
        {
            _ = AutoFlushAsync(flushCts.Token);

            await foreach (var item in data.WithCancellation(cancellationToken))
            {
                // WithCancellation passes the token to the producer.
                // However, not all producers support cancellation, so we need to check it here as well.
                cancellationToken.ThrowIfCancellationRequested();

                var (batch, partition) = Add(item);
                if (batch.Count >= options.BatchSize)
                {
                    await SendAsync(batch, partition).ConfigureAwait(false);
                }

                if (lastPartitionsAssignmentCheck.Elapsed > PartitionAssignmentUpdateFrequency)
                {
                    var newAssignment = await partitionAssignmentProvider().ConfigureAwait(false);

                    if (newAssignment != partitionAssignment)
                    {
                        // Drain all batches to preserve order when partition assignment changes.
                        await Drain().ConfigureAwait(false);
                        partitionAssignment = newAssignment;
                    }

                    lastPartitionsAssignmentCheck.Restart();
                }
            }

            await Drain().ConfigureAwait(false);
        }
        finally
        {
            flushCts.Cancel();
            foreach (var batch in batches.Values)
            {
                batch.Buffer.Dispose();

                Metrics.StreamerItemsQueuedDecrement(batch.Count);
                Metrics.StreamerBatchesActiveDecrement();
            }
        }

        (Batch Batch, string Partition) Add(T item)
        {
            var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count, hashedColumnsPredicate: schema);

            try
            {
                return Add0(item, ref tupleBuilder);
            }
            finally
            {
                tupleBuilder.Dispose();
            }
        }

        (Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder)
        {
            var columnCount = schema.Columns.Count;

            // Use MemoryMarshal to work around [CS8352]: "Cannot use variable 'noValueSet' in this context
            // because it may expose referenced variables outside of their declaration scope".
            Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
            Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref MemoryMarshal.GetReference(noValueSet), columnCount);

            writer.Write(ref tupleBuilder, item, schema, columnCount, noValueSetRef);

            var partition = partitionAssignment == null
                ? string.Empty // Default connection.
                : partitionAssignment[Math.Abs(tupleBuilder.GetHash() % partitionAssignment.Length)];

            var batch = GetOrCreateBatch(partition);

            lock (batch)
            {
                batch.Count++;

                noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
                batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
            }

            Metrics.StreamerItemsQueuedIncrement();

            return (batch, partition);
        }

        Batch GetOrCreateBatch(string partition)
        {
            ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);

            if (batchRef == null)
            {
                batchRef = new Batch();
                InitBuffer(batchRef);

                Metrics.StreamerBatchesActiveIncrement();
            }

            return batchRef;
        }

        async Task SendAsync(Batch batch, string partition)
        {
            var expectedSize = batch.Count;

            // Wait for the previous task for this batch to finish: preserve order, backpressure control.
            await batch.Task.ConfigureAwait(false);

            lock (batch)
            {
                if (batch.Count != expectedSize || batch.Count == 0)
                {
                    // Concurrent update happened.
                    return;
                }

                var buf = batch.Buffer;

                // See RecordSerializer.WriteMultiple.
                buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
                buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);

                batch.Task = SendAndDisposeBufAsync(buf, partition, batch.Task, batch.Count);

                batch.Count = 0;
                batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync.
                InitBuffer(batch);
                batch.LastFlush = Stopwatch.GetTimestamp();

                Metrics.StreamerBatchesActiveIncrement();
            }
        }

        async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string partition, Task oldTask, int count)
        {
            try
            {
                // Wait for the previous batch for this node to preserve item order.
                await oldTask.ConfigureAwait(false);
                await sender(buf, partition, retryPolicy).ConfigureAwait(false);

                Metrics.StreamerBatchesSent.Add(1);
                Metrics.StreamerItemsSent.Add(count);
            }
            finally
            {
                buf.Dispose();

                Metrics.StreamerItemsQueuedDecrement(count);
                Metrics.StreamerBatchesActiveDecrement();
            }
        }

        async Task AutoFlushAsync(CancellationToken flushCt)
        {
            while (!flushCt.IsCancellationRequested)
            {
                await Task.Delay(options.AutoFlushFrequency, flushCt).ConfigureAwait(false);
                var ts = Stopwatch.GetTimestamp();

                foreach (var (partition, batch) in batches)
                {
                    if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushFrequency.Ticks)
                    {
                        await SendAsync(batch, partition).ConfigureAwait(false);
                    }
                }
            }
        }

        void InitBuffer(Batch batch)
        {
            var buf = batch.Buffer;

            var w = buf.MessageWriter;
            w.Write(schema.TableId);
            w.WriteTx(null);
            w.Write(schema.Version);

            batch.CountPos = buf.Position;
            buf.Advance(5); // Reserve count.
        }

        async Task Drain()
        {
            foreach (var (partition, batch) in batches)
            {
                if (batch.Count > 0)
                {
                    await SendAsync(batch, partition).ConfigureAwait(false);
                }

                await batch.Task.ConfigureAwait(false);
            }
        }
    }