in modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs [77:399]
internal static async Task StreamDataAsync<TSource, TKey, TPayload, TArg, TResult>(
IAsyncEnumerable<TSource> data,
Table table,
Func<TSource, TKey> keySelector,
Func<TSource, TPayload> payloadSelector,
IRecordSerializerHandler<TKey> keyWriter,
DataStreamerOptions options,
Channel<TResult>? resultChannel,
IEnumerable<DeploymentUnit> units,
string receiverClassName,
TArg receiverArg,
CancellationToken cancellationToken)
where TKey : notnull
where TPayload : notnull
{
IgniteArgumentCheck.NotNull(data);
DataStreamer.ValidateOptions(options);
// ConcurrentDictionary is not necessary because we consume the source sequentially.
// However, locking for batches is required due to auto-flush background task.
var batches = new Dictionary<int, Batch<TSource, TPayload>>();
var failedItems = new ConcurrentQueue<TSource>();
var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };
var units0 = units as ICollection<DeploymentUnit> ?? units.ToList(); // Avoid multiple enumeration.
var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
var partitionAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
var partitionCount = partitionAssignment.Length; // Can't be changed.
Debug.Assert(partitionCount > 0, "partitionCount > 0");
Type? payloadType = null;
using var autoFlushCts = new CancellationTokenSource();
Task? autoFlushTask = null;
Exception? error = null;
try
{
autoFlushTask = AutoFlushAsync(autoFlushCts.Token);
await foreach (var item in data.WithCancellation(cancellationToken))
{
// WithCancellation passes the token to the producer.
// However, not all producers support cancellation, so we need to check it here as well.
cancellationToken.ThrowIfCancellationRequested();
var newAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
if (newAssignment != partitionAssignment)
{
// Drain all batches to preserve order when partition assignment changes.
await Drain().ConfigureAwait(false);
partitionAssignment = newAssignment;
}
var batch = Add(item);
if (batch.Count >= options.PageSize)
{
await SendAsync(batch).ConfigureAwait(false);
}
if (autoFlushTask.IsFaulted)
{
await autoFlushTask.ConfigureAwait(false);
}
}
await Drain().ConfigureAwait(false);
}
catch (Exception e)
{
error = e;
}
finally
{
await autoFlushCts.CancelAsync().ConfigureAwait(false);
if (autoFlushTask is { })
{
try
{
await autoFlushTask.ConfigureAwait(false);
}
catch (Exception e)
{
if (e is not OperationCanceledException)
{
error ??= e;
}
}
}
foreach (var batch in batches.Values)
{
lock (batch)
{
for (var i = 0; i < batch.Count; i++)
{
failedItems.Enqueue(batch.SourceItems[i]);
}
GetPool<TPayload>().Return(batch.Items);
GetPool<TSource>().Return(batch.SourceItems);
Metrics.StreamerItemsQueuedDecrement(batch.Count);
Metrics.StreamerBatchesActiveDecrement();
}
}
if (error is { })
{
throw DataStreamerException.Create(error, failedItems);
}
if (!failedItems.IsEmpty)
{
// Should not happen.
throw DataStreamerException.Create(new InvalidOperationException("Some items were not processed."), failedItems);
}
}
return;
Batch<TSource, TPayload> Add(TSource item)
{
var tupleBuilder = new BinaryTupleBuilder(schema.KeyColumns.Length, hashedColumnsPredicate: schema.HashedColumnIndexProvider);
try
{
return Add0(item, ref tupleBuilder);
}
finally
{
tupleBuilder.Dispose();
}
}
Batch<TSource, TPayload> Add0(TSource item, ref BinaryTupleBuilder tupleBuilder)
{
// Write key to compute hash.
var key = keySelector(item);
keyWriter.Write(ref tupleBuilder, key, schema, keyOnly: true, Span<byte>.Empty);
var partitionId = Math.Abs(tupleBuilder.GetHash() % partitionCount);
var batch = GetOrCreateBatch(partitionId);
var payload = payloadSelector(item);
IgniteArgumentCheck.NotNull(payload);
if (payloadType == null)
{
payloadType = payload.GetType();
}
else if (payloadType != payload.GetType())
{
throw new InvalidOperationException(
$"All streamer items returned by payloadSelector must be of the same type. " +
$"Expected: {payloadType}, actual: {payload.GetType()}.");
}
lock (batch)
{
batch.Items[batch.Count] = payload;
batch.SourceItems[batch.Count] = item;
batch.Count++;
}
Metrics.StreamerItemsQueuedIncrement();
return batch;
}
Batch<TSource, TPayload> GetOrCreateBatch(int partitionId)
{
ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);
if (batchRef == null)
{
batchRef = new Batch<TSource, TPayload>(options.PageSize, partitionId);
Metrics.StreamerBatchesActiveIncrement();
}
return batchRef;
}
async Task SendAsync(Batch<TSource, TPayload> batch)
{
var expectedSize = batch.Count;
// Wait for the previous task for this batch to finish: preserve order, backpressure control.
await batch.Task.ConfigureAwait(false);
lock (batch)
{
if (batch.Count != expectedSize || batch.Count == 0)
{
// Concurrent update happened.
return;
}
batch.Task = SendAndDisposeBufAsync(batch.PartitionId, batch.Task, batch.Items, batch.SourceItems, batch.Count);
batch.Items = GetPool<TPayload>().Rent(options.PageSize);
batch.SourceItems = GetPool<TSource>().Rent(options.PageSize);
batch.Count = 0;
batch.LastFlush = Stopwatch.GetTimestamp();
Metrics.StreamerBatchesActiveIncrement();
}
}
async Task SendAndDisposeBufAsync(
int partitionId,
Task oldTask,
TPayload[] items,
TSource[] sourceItems,
int count)
{
// Release the thread that holds the batch lock.
await Task.Yield();
var buf = ProtoCommon.GetMessageWriter();
TResult[]? results = null;
try
{
SerializeBatch(buf, items.AsSpan(0, count), partitionId);
// ReSharper disable once AccessToModifiedClosure
var preferredNode = PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
// Wait for the previous batch for this node to preserve item order.
await oldTask.ConfigureAwait(false);
(results, int resultsCount) = await SendBatchAsync<TResult>(
table, buf, count, preferredNode, retryPolicy, expectResults: resultChannel != null).ConfigureAwait(false);
if (results != null && resultChannel != null)
{
for (var i = 0; i < resultsCount; i++)
{
TResult result = results[i];
await resultChannel.Writer.WriteAsync(result, cancellationToken).ConfigureAwait(false);
}
}
}
catch (ChannelClosedException)
{
// Consumer does not want more results, stop returning them, but keep streaming.
resultChannel = null;
}
catch (Exception)
{
for (var i = 0; i < count; i++)
{
failedItems.Enqueue(sourceItems[i]);
}
throw;
}
finally
{
buf.Dispose();
GetPool<TPayload>().Return(items);
GetPool<TSource>().Return(sourceItems);
if (results != null)
{
GetPool<TResult>().Return(results);
}
Metrics.StreamerItemsQueuedDecrement(count);
Metrics.StreamerBatchesActiveDecrement();
}
}
async Task AutoFlushAsync(CancellationToken flushCt)
{
while (!flushCt.IsCancellationRequested)
{
await Task.Delay(options.AutoFlushInterval, flushCt).ConfigureAwait(false);
var ts = Stopwatch.GetTimestamp();
foreach (var batch in batches.Values)
{
if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushInterval.Ticks)
{
await SendAsync(batch).ConfigureAwait(false);
}
}
}
}
async Task Drain()
{
foreach (var batch in batches.Values)
{
if (batch.Count > 0)
{
await SendAsync(batch).ConfigureAwait(false);
}
await batch.Task.ConfigureAwait(false);
}
}
void SerializeBatch<T>(
PooledArrayBuffer buf,
Span<T> items,
int partitionId)
{
// T is one of the supported types (numbers, strings, etc).
var w = buf.MessageWriter;
w.Write(table.Id);
w.Write(partitionId);
Compute.WriteUnits(units0, buf);
var expectResults = resultChannel != null;
w.Write(expectResults);
WriteReceiverPayload(ref w, receiverClassName, receiverArg, items);
}
}