in modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs [67:422]
internal static async Task StreamDataAsync<T>(
IAsyncEnumerable<DataStreamerItem<T>> data,
Table table,
IRecordSerializerHandler<T> writer,
DataStreamerOptions options,
CancellationToken cancellationToken)
{
IgniteArgumentCheck.NotNull(data);
ValidateOptions(options);
// ConcurrentDictionary is not necessary because we consume the source sequentially.
// However, locking for batches is required due to auto-flush background task.
var batches = new Dictionary<int, Batch<T>>();
var failedItems = new ConcurrentQueue<DataStreamerItem<T>>();
var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };
var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
var schemaLock = new SemaphoreSlim(1);
var partitionAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
var partitionCount = partitionAssignment.Length; // Can't be changed.
Debug.Assert(partitionCount > 0, "partitionCount > 0");
using var autoFlushCts = new CancellationTokenSource();
Task? autoFlushTask = null;
Exception? error = null;
try
{
autoFlushTask = AutoFlushAsync(autoFlushCts.Token);
await foreach (var item in data.WithCancellation(cancellationToken))
{
// WithCancellation passes the token to the producer.
// However, not all producers support cancellation, so we need to check it here as well.
cancellationToken.ThrowIfCancellationRequested();
var newAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
if (newAssignment != partitionAssignment)
{
// Drain all batches to preserve order when partition assignment changes.
await Drain().ConfigureAwait(false);
partitionAssignment = newAssignment;
}
var batch = await AddWithRetryUnmapped(item).ConfigureAwait(false);
if (batch.Count >= options.PageSize)
{
await SendAsync(batch).ConfigureAwait(false);
}
if (autoFlushTask.IsFaulted)
{
await autoFlushTask.ConfigureAwait(false);
}
}
await Drain().ConfigureAwait(false);
}
catch (Exception e)
{
error = e;
}
finally
{
await autoFlushCts.CancelAsync().ConfigureAwait(false);
if (autoFlushTask is { })
{
try
{
await autoFlushTask.ConfigureAwait(false);
}
catch (Exception e)
{
if (e is not OperationCanceledException)
{
error ??= e;
}
}
}
foreach (var batch in batches.Values)
{
lock (batch)
{
for (var i = 0; i < batch.Count; i++)
{
failedItems.Enqueue(batch.Items[i]);
}
batch.Buffer.Dispose();
GetPool<T>().Return(batch.Items);
Metrics.StreamerItemsQueuedDecrement(batch.Count);
Metrics.StreamerBatchesActiveDecrement();
}
}
if (error is { })
{
throw DataStreamerException.Create(error, failedItems);
}
if (!failedItems.IsEmpty)
{
// Should not happen.
throw DataStreamerException.Create(new InvalidOperationException("Some items were not processed."), failedItems);
}
}
return;
async ValueTask<Batch<T>> AddWithRetryUnmapped(DataStreamerItem<T> item)
{
try
{
return Add(item);
}
catch (Exception e) when (e.CausedByUnmappedColumns())
{
await UpdateSchema().ConfigureAwait(false);
return Add(item);
}
}
Batch<T> Add(DataStreamerItem<T> item)
{
var schema0 = schema;
var columnCount = item.OperationType == DataStreamerOperationType.Remove
? schema0.KeyColumns.Length
: schema0.Columns.Length;
var tupleBuilder = new BinaryTupleBuilder(columnCount, hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
try
{
return Add0(item, ref tupleBuilder, schema0);
}
finally
{
tupleBuilder.Dispose();
}
}
Batch<T> Add0(DataStreamerItem<T> item, ref BinaryTupleBuilder tupleBuilder, Schema schema0)
{
var columnCount = schema0.Columns.Length;
// Use MemoryMarshal to work around [CS8352]: "Cannot use variable 'noValueSet' in this context
// because it may expose referenced variables outside of their declaration scope".
Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref MemoryMarshal.GetReference(noValueSet), columnCount);
var keyOnly = item.OperationType == DataStreamerOperationType.Remove;
writer.Write(ref tupleBuilder, item.Data, schema0, keyOnly: keyOnly, noValueSetRef);
var partitionId = Math.Abs(tupleBuilder.GetHash() % partitionCount);
var batch = GetOrCreateBatch(partitionId);
lock (batch)
{
batch.Items[batch.Count++] = item;
if (batch.Schema != schema0)
{
batch.SchemaOutdated = true;
}
// 1. To compute target partition, we need key hash.
// 2. To compute key hash, we need to serialize the key.
// 3. Since we already serialized the key, we can use it for the message body and avoid re-serialization.
// However, if schema gets updated, we need to re-serialize the whole batch.
// Schema update is rare, so we optimize for the happy path.
if (!batch.SchemaOutdated)
{
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
}
}
Metrics.StreamerItemsQueuedIncrement();
return batch;
}
Batch<T> GetOrCreateBatch(int partitionId)
{
ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);
if (batchRef == null)
{
batchRef = new Batch<T>(options.PageSize, schema, partitionId);
InitBuffer(batchRef, schema);
Metrics.StreamerBatchesActiveIncrement();
}
return batchRef;
}
async Task SendAsync(Batch<T> batch)
{
var expectedSize = batch.Count;
// Wait for the previous task for this batch to finish: preserve order, backpressure control.
await batch.Task.ConfigureAwait(false);
lock (batch)
{
if (batch.Count != expectedSize || batch.Count == 0)
{
// Concurrent update happened.
return;
}
FinalizeBatchHeader(batch);
batch.Task = SendAndDisposeBufAsync(
batch.Buffer, batch.PartitionId, batch.Task, batch.Items, batch.Count, batch.SchemaOutdated, batch.Schema.Version);
batch.Items = GetPool<T>().Rent(options.PageSize);
batch.Count = 0;
batch.Schema = schema;
batch.SchemaOutdated = false;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync.
InitBuffer(batch, batch.Schema);
batch.LastFlush = Stopwatch.GetTimestamp();
Metrics.StreamerBatchesActiveIncrement();
}
}
async Task SendAndDisposeBufAsync(
PooledArrayBuffer buf,
int partitionId,
Task oldTask,
DataStreamerItem<T>[] items,
int count,
bool batchSchemaOutdated,
int batchSchemaVer)
{
try
{
Debug.Assert(items.Length > 0, "items.Length > 0");
var schema0 = schema;
if (batchSchemaOutdated || batchSchemaVer < schema0.Version)
{
// Schema update was detected while the batch was being filled.
// Re-serialize the whole batch.
ReWriteBatch(buf, partitionId, schema0, items.AsSpan(0, count), writer);
}
// ReSharper disable once AccessToModifiedClosure
var preferredNode = PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
int? schemaVersion = null;
while (true)
{
try
{
if (schemaVersion is { })
{
// Serialize again with the new schema.
schema0 = await UpdateSchema(schemaVersion.Value).ConfigureAwait(false);
ReWriteBatch(buf, partitionId, schema0, items.AsSpan(0, count), writer);
}
// Wait for the previous batch for this node to preserve item order.
await oldTask.ConfigureAwait(false);
await SendBatchAsync(table, buf, count, preferredNode, retryPolicy).ConfigureAwait(false);
return;
}
catch (IgniteException e) when (e.Code == ErrorGroups.Table.SchemaVersionMismatch &&
schemaVersion != e.GetExpectedSchemaVersion())
{
// Schema update detected after the batch was serialized.
schemaVersion = e.GetExpectedSchemaVersion();
}
catch (Exception e) when (e.CausedByUnmappedColumns())
{
schemaVersion = Table.SchemaVersionForceLatest;
}
}
}
catch (Exception)
{
for (var i = 0; i < count; i++)
{
failedItems.Enqueue(items[i]);
}
throw;
}
finally
{
buf.Dispose();
GetPool<T>().Return(items);
Metrics.StreamerItemsQueuedDecrement(count);
Metrics.StreamerBatchesActiveDecrement();
}
}
async Task AutoFlushAsync(CancellationToken flushCt)
{
while (!flushCt.IsCancellationRequested)
{
await Task.Delay(options.AutoFlushInterval, flushCt).ConfigureAwait(false);
var ts = Stopwatch.GetTimestamp();
foreach (var batch in batches.Values)
{
if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushInterval.Ticks)
{
await SendAsync(batch).ConfigureAwait(false);
}
}
}
}
async Task Drain()
{
foreach (var batch in batches.Values)
{
if (batch.Count > 0)
{
await SendAsync(batch).ConfigureAwait(false);
}
await batch.Task.ConfigureAwait(false);
}
}
async ValueTask<Schema> UpdateSchema(int ver = Table.SchemaVersionForceLatest)
{
await schemaLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (ver != Table.SchemaVersionForceLatest && schema.Version >= ver)
{
return schema;
}
schema = await table.GetSchemaAsync(ver).ConfigureAwait(false);
return schema;
}
finally
{
schemaLock.Release();
}
}
}