internal static async Task StreamDataAsync()

in modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs [67:422]


    internal static async Task StreamDataAsync<T>(
        IAsyncEnumerable<DataStreamerItem<T>> data,
        Table table,
        IRecordSerializerHandler<T> writer,
        DataStreamerOptions options,
        CancellationToken cancellationToken)
    {
        IgniteArgumentCheck.NotNull(data);
        ValidateOptions(options);

        // ConcurrentDictionary is not necessary because we consume the source sequentially.
        // However, locking for batches is required due to auto-flush background task.
        var batches = new Dictionary<int, Batch<T>>();
        var failedItems = new ConcurrentQueue<DataStreamerItem<T>>();
        var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };

        var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
        var schemaLock = new SemaphoreSlim(1);

        var partitionAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
        var partitionCount = partitionAssignment.Length; // Can't be changed.
        Debug.Assert(partitionCount > 0, "partitionCount > 0");

        using var autoFlushCts = new CancellationTokenSource();
        Task? autoFlushTask = null;
        Exception? error = null;

        try
        {
            autoFlushTask = AutoFlushAsync(autoFlushCts.Token);

            await foreach (var item in data.WithCancellation(cancellationToken))
            {
                // WithCancellation passes the token to the producer.
                // However, not all producers support cancellation, so we need to check it here as well.
                cancellationToken.ThrowIfCancellationRequested();

                var newAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
                if (newAssignment != partitionAssignment)
                {
                    // Drain all batches to preserve order when partition assignment changes.
                    await Drain().ConfigureAwait(false);
                    partitionAssignment = newAssignment;
                }

                var batch = await AddWithRetryUnmapped(item).ConfigureAwait(false);
                if (batch.Count >= options.PageSize)
                {
                    await SendAsync(batch).ConfigureAwait(false);
                }

                if (autoFlushTask.IsFaulted)
                {
                    await autoFlushTask.ConfigureAwait(false);
                }
            }

            await Drain().ConfigureAwait(false);
        }
        catch (Exception e)
        {
            error = e;
        }
        finally
        {
            await autoFlushCts.CancelAsync().ConfigureAwait(false);

            if (autoFlushTask is { })
            {
                try
                {
                    await autoFlushTask.ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    if (e is not OperationCanceledException)
                    {
                        error ??= e;
                    }
                }
            }

            foreach (var batch in batches.Values)
            {
                lock (batch)
                {
                    for (var i = 0; i < batch.Count; i++)
                    {
                        failedItems.Enqueue(batch.Items[i]);
                    }

                    batch.Buffer.Dispose();
                    GetPool<T>().Return(batch.Items);

                    Metrics.StreamerItemsQueuedDecrement(batch.Count);
                    Metrics.StreamerBatchesActiveDecrement();
                }
            }

            if (error is { })
            {
                throw DataStreamerException.Create(error, failedItems);
            }

            if (!failedItems.IsEmpty)
            {
                // Should not happen.
                throw DataStreamerException.Create(new InvalidOperationException("Some items were not processed."), failedItems);
            }
        }

        return;

        async ValueTask<Batch<T>> AddWithRetryUnmapped(DataStreamerItem<T> item)
        {
            try
            {
                return Add(item);
            }
            catch (Exception e) when (e.CausedByUnmappedColumns())
            {
                await UpdateSchema().ConfigureAwait(false);
                return Add(item);
            }
        }

        Batch<T> Add(DataStreamerItem<T> item)
        {
            var schema0 = schema;

            var columnCount = item.OperationType == DataStreamerOperationType.Remove
                ? schema0.KeyColumns.Length
                : schema0.Columns.Length;

            var tupleBuilder = new BinaryTupleBuilder(columnCount, hashedColumnsPredicate: schema0.HashedColumnIndexProvider);

            try
            {
                return Add0(item, ref tupleBuilder, schema0);
            }
            finally
            {
                tupleBuilder.Dispose();
            }
        }

        Batch<T> Add0(DataStreamerItem<T> item, ref BinaryTupleBuilder tupleBuilder, Schema schema0)
        {
            var columnCount = schema0.Columns.Length;

            // Use MemoryMarshal to work around [CS8352]: "Cannot use variable 'noValueSet' in this context
            // because it may expose referenced variables outside of their declaration scope".
            Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
            Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref MemoryMarshal.GetReference(noValueSet), columnCount);

            var keyOnly = item.OperationType == DataStreamerOperationType.Remove;
            writer.Write(ref tupleBuilder, item.Data, schema0, keyOnly: keyOnly, noValueSetRef);

            var partitionId = Math.Abs(tupleBuilder.GetHash() % partitionCount);
            var batch = GetOrCreateBatch(partitionId);

            lock (batch)
            {
                batch.Items[batch.Count++] = item;

                if (batch.Schema != schema0)
                {
                    batch.SchemaOutdated = true;
                }

                // 1. To compute target partition, we need key hash.
                // 2. To compute key hash, we need to serialize the key.
                // 3. Since we already serialized the key, we can use it for the message body and avoid re-serialization.
                // However, if schema gets updated, we need to re-serialize the whole batch.
                // Schema update is rare, so we optimize for the happy path.
                if (!batch.SchemaOutdated)
                {
                    noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
                    batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
                }
            }

            Metrics.StreamerItemsQueuedIncrement();

            return batch;
        }

        Batch<T> GetOrCreateBatch(int partitionId)
        {
            ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);

            if (batchRef == null)
            {
                batchRef = new Batch<T>(options.PageSize, schema, partitionId);
                InitBuffer(batchRef, schema);

                Metrics.StreamerBatchesActiveIncrement();
            }

            return batchRef;
        }

        async Task SendAsync(Batch<T> batch)
        {
            var expectedSize = batch.Count;

            // Wait for the previous task for this batch to finish: preserve order, backpressure control.
            await batch.Task.ConfigureAwait(false);

            lock (batch)
            {
                if (batch.Count != expectedSize || batch.Count == 0)
                {
                    // Concurrent update happened.
                    return;
                }

                FinalizeBatchHeader(batch);
                batch.Task = SendAndDisposeBufAsync(
                    batch.Buffer, batch.PartitionId, batch.Task, batch.Items, batch.Count, batch.SchemaOutdated, batch.Schema.Version);

                batch.Items = GetPool<T>().Rent(options.PageSize);
                batch.Count = 0;
                batch.Schema = schema;
                batch.SchemaOutdated = false;
                batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync.
                InitBuffer(batch, batch.Schema);
                batch.LastFlush = Stopwatch.GetTimestamp();

                Metrics.StreamerBatchesActiveIncrement();
            }
        }

        async Task SendAndDisposeBufAsync(
            PooledArrayBuffer buf,
            int partitionId,
            Task oldTask,
            DataStreamerItem<T>[] items,
            int count,
            bool batchSchemaOutdated,
            int batchSchemaVer)
        {
            try
            {
                Debug.Assert(items.Length > 0, "items.Length > 0");
                var schema0 = schema;

                if (batchSchemaOutdated || batchSchemaVer < schema0.Version)
                {
                    // Schema update was detected while the batch was being filled.
                    // Re-serialize the whole batch.
                    ReWriteBatch(buf, partitionId, schema0, items.AsSpan(0, count), writer);
                }

                // ReSharper disable once AccessToModifiedClosure
                var preferredNode = PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);

                int? schemaVersion = null;
                while (true)
                {
                    try
                    {
                        if (schemaVersion is { })
                        {
                            // Serialize again with the new schema.
                            schema0 = await UpdateSchema(schemaVersion.Value).ConfigureAwait(false);
                            ReWriteBatch(buf, partitionId, schema0, items.AsSpan(0, count), writer);
                        }

                        // Wait for the previous batch for this node to preserve item order.
                        await oldTask.ConfigureAwait(false);
                        await SendBatchAsync(table, buf, count, preferredNode, retryPolicy).ConfigureAwait(false);

                        return;
                    }
                    catch (IgniteException e) when (e.Code == ErrorGroups.Table.SchemaVersionMismatch &&
                                                    schemaVersion != e.GetExpectedSchemaVersion())
                    {
                        // Schema update detected after the batch was serialized.
                        schemaVersion = e.GetExpectedSchemaVersion();
                    }
                    catch (Exception e) when (e.CausedByUnmappedColumns())
                    {
                        schemaVersion = Table.SchemaVersionForceLatest;
                    }
                }
            }
            catch (Exception)
            {
                for (var i = 0; i < count; i++)
                {
                    failedItems.Enqueue(items[i]);
                }

                throw;
            }
            finally
            {
                buf.Dispose();
                GetPool<T>().Return(items);

                Metrics.StreamerItemsQueuedDecrement(count);
                Metrics.StreamerBatchesActiveDecrement();
            }
        }

        async Task AutoFlushAsync(CancellationToken flushCt)
        {
            while (!flushCt.IsCancellationRequested)
            {
                await Task.Delay(options.AutoFlushInterval, flushCt).ConfigureAwait(false);
                var ts = Stopwatch.GetTimestamp();

                foreach (var batch in batches.Values)
                {
                    if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushInterval.Ticks)
                    {
                        await SendAsync(batch).ConfigureAwait(false);
                    }
                }
            }
        }

        async Task Drain()
        {
            foreach (var batch in batches.Values)
            {
                if (batch.Count > 0)
                {
                    await SendAsync(batch).ConfigureAwait(false);
                }

                await batch.Task.ConfigureAwait(false);
            }
        }

        async ValueTask<Schema> UpdateSchema(int ver = Table.SchemaVersionForceLatest)
        {
            await schemaLock.WaitAsync(cancellationToken).ConfigureAwait(false);

            try
            {
                if (ver != Table.SchemaVersionForceLatest && schema.Version >= ver)
                {
                    return schema;
                }

                schema = await table.GetSchemaAsync(ver).ConfigureAwait(false);
                return schema;
            }
            finally
            {
                schemaLock.Release();
            }
        }
    }