in modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs [62:286]
internal static async Task StreamDataAsync<T>(
IAsyncEnumerable<T> data,
Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
IRecordSerializerHandler<T> writer,
Func<Task<Schema>> schemaProvider,
Func<ValueTask<string[]?>> partitionAssignmentProvider,
DataStreamerOptions options,
CancellationToken cancellationToken)
{
IgniteArgumentCheck.NotNull(data);
IgniteArgumentCheck.Ensure(options.BatchSize > 0, $"{nameof(options.BatchSize)} should be positive.");
IgniteArgumentCheck.Ensure(options.AutoFlushFrequency > TimeSpan.Zero, $"{nameof(options.AutoFlushFrequency)} should be positive.");
IgniteArgumentCheck.Ensure(options.RetryLimit >= 0, $"{nameof(options.RetryLimit)} should be non-negative.");
// ConcurrentDictionary is not necessary because we consume the source sequentially.
// However, locking for batches is required due to auto-flush background task.
var batches = new Dictionary<string, Batch>();
var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };
// TODO: IGNITE-19710 Data Streamer schema synchronization
var schema = await schemaProvider().ConfigureAwait(false);
var partitionAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
using var flushCts = new CancellationTokenSource();
try
{
_ = AutoFlushAsync(flushCts.Token);
await foreach (var item in data.WithCancellation(cancellationToken))
{
// WithCancellation passes the token to the producer.
// However, not all producers support cancellation, so we need to check it here as well.
cancellationToken.ThrowIfCancellationRequested();
var (batch, partition) = Add(item);
if (batch.Count >= options.BatchSize)
{
await SendAsync(batch, partition).ConfigureAwait(false);
}
if (lastPartitionsAssignmentCheck.Elapsed > PartitionAssignmentUpdateFrequency)
{
var newAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
if (newAssignment != partitionAssignment)
{
// Drain all batches to preserve order when partition assignment changes.
await Drain().ConfigureAwait(false);
partitionAssignment = newAssignment;
}
lastPartitionsAssignmentCheck.Restart();
}
}
await Drain().ConfigureAwait(false);
}
finally
{
flushCts.Cancel();
foreach (var batch in batches.Values)
{
batch.Buffer.Dispose();
Metrics.StreamerItemsQueuedDecrement(batch.Count);
Metrics.StreamerBatchesActiveDecrement();
}
}
(Batch Batch, string Partition) Add(T item)
{
var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count, hashedColumnsPredicate: schema);
try
{
return Add0(item, ref tupleBuilder);
}
finally
{
tupleBuilder.Dispose();
}
}
(Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder)
{
var columnCount = schema.Columns.Count;
// Use MemoryMarshal to work around [CS8352]: "Cannot use variable 'noValueSet' in this context
// because it may expose referenced variables outside of their declaration scope".
Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref MemoryMarshal.GetReference(noValueSet), columnCount);
writer.Write(ref tupleBuilder, item, schema, columnCount, noValueSetRef);
var partition = partitionAssignment == null
? string.Empty // Default connection.
: partitionAssignment[Math.Abs(tupleBuilder.GetHash() % partitionAssignment.Length)];
var batch = GetOrCreateBatch(partition);
lock (batch)
{
batch.Count++;
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
}
Metrics.StreamerItemsQueuedIncrement();
return (batch, partition);
}
Batch GetOrCreateBatch(string partition)
{
ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
if (batchRef == null)
{
batchRef = new Batch();
InitBuffer(batchRef);
Metrics.StreamerBatchesActiveIncrement();
}
return batchRef;
}
async Task SendAsync(Batch batch, string partition)
{
var expectedSize = batch.Count;
// Wait for the previous task for this batch to finish: preserve order, backpressure control.
await batch.Task.ConfigureAwait(false);
lock (batch)
{
if (batch.Count != expectedSize || batch.Count == 0)
{
// Concurrent update happened.
return;
}
var buf = batch.Buffer;
// See RecordSerializer.WriteMultiple.
buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
batch.Task = SendAndDisposeBufAsync(buf, partition, batch.Task, batch.Count);
batch.Count = 0;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync.
InitBuffer(batch);
batch.LastFlush = Stopwatch.GetTimestamp();
Metrics.StreamerBatchesActiveIncrement();
}
}
async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string partition, Task oldTask, int count)
{
try
{
// Wait for the previous batch for this node to preserve item order.
await oldTask.ConfigureAwait(false);
await sender(buf, partition, retryPolicy).ConfigureAwait(false);
Metrics.StreamerBatchesSent.Add(1);
Metrics.StreamerItemsSent.Add(count);
}
finally
{
buf.Dispose();
Metrics.StreamerItemsQueuedDecrement(count);
Metrics.StreamerBatchesActiveDecrement();
}
}
async Task AutoFlushAsync(CancellationToken flushCt)
{
while (!flushCt.IsCancellationRequested)
{
await Task.Delay(options.AutoFlushFrequency, flushCt).ConfigureAwait(false);
var ts = Stopwatch.GetTimestamp();
foreach (var (partition, batch) in batches)
{
if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushFrequency.Ticks)
{
await SendAsync(batch, partition).ConfigureAwait(false);
}
}
}
}
void InitBuffer(Batch batch)
{
var buf = batch.Buffer;
var w = buf.MessageWriter;
w.Write(schema.TableId);
w.WriteTx(null);
w.Write(schema.Version);
batch.CountPos = buf.Position;
buf.Advance(5); // Reserve count.
}
async Task Drain()
{
foreach (var (partition, batch) in batches)
{
if (batch.Count > 0)
{
await SendAsync(batch, partition).ConfigureAwait(false);
}
await batch.Task.ConfigureAwait(false);
}
}
}