modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs (195 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Apache.Ignite.Internal.Table; using System; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Buffers; using Common; using Ignite.Table; using Proto; using Proto.BinaryTuple; using Proto.MsgPack; using Serialization; /// <summary> /// Data streamer. /// <para /> /// Implementation notes: /// * Hashing is combined with serialization (unlike Java client), so we write binary tuples to a per-node buffer right away. /// - Pros: cheaper happy path; /// - Cons: will require re-serialization on schema update. /// * Iteration and serialization are sequential. /// * Batches are sent asynchronously; we wait for the previous batch only when the next batch for the given node is full. /// * The more connections to different servers we have - the more parallelism we get (see benchmark). /// * There is no parallelism for the same node, because we need to guarantee ordering. /// </summary> internal static class DataStreamer { private static readonly TimeSpan PartitionAssignmentUpdateFrequency = TimeSpan.FromSeconds(15); /// <summary> /// Streams the data. /// </summary> /// <param name="data">Data.</param> /// <param name="sender">Batch sender.</param> /// <param name="writer">Item writer.</param> /// <param name="schemaProvider">Schema provider.</param> /// <param name="partitionAssignmentProvider">Partitioner.</param> /// <param name="options">Options.</param> /// <param name="cancellationToken">Cancellation token.</param> /// <typeparam name="T">Element type.</typeparam> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> internal static async Task StreamDataAsync<T>( IAsyncEnumerable<T> data, Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender, IRecordSerializerHandler<T> writer, Func<Task<Schema>> schemaProvider, Func<ValueTask<string[]?>> partitionAssignmentProvider, DataStreamerOptions options, CancellationToken cancellationToken) { IgniteArgumentCheck.NotNull(data); IgniteArgumentCheck.Ensure(options.BatchSize > 0, $"{nameof(options.BatchSize)} should be positive."); IgniteArgumentCheck.Ensure(options.AutoFlushFrequency > TimeSpan.Zero, $"{nameof(options.AutoFlushFrequency)} should be positive."); IgniteArgumentCheck.Ensure(options.RetryLimit >= 0, $"{nameof(options.RetryLimit)} should be non-negative."); // ConcurrentDictionary is not necessary because we consume the source sequentially. // However, locking for batches is required due to auto-flush background task. var batches = new Dictionary<string, Batch>(); var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit }; // TODO: IGNITE-19710 Data Streamer schema synchronization var schema = await schemaProvider().ConfigureAwait(false); var partitionAssignment = await partitionAssignmentProvider().ConfigureAwait(false); var lastPartitionsAssignmentCheck = Stopwatch.StartNew(); using var flushCts = new CancellationTokenSource(); try { _ = AutoFlushAsync(flushCts.Token); await foreach (var item in data.WithCancellation(cancellationToken)) { // WithCancellation passes the token to the producer. // However, not all producers support cancellation, so we need to check it here as well. cancellationToken.ThrowIfCancellationRequested(); var (batch, partition) = Add(item); if (batch.Count >= options.BatchSize) { await SendAsync(batch, partition).ConfigureAwait(false); } if (lastPartitionsAssignmentCheck.Elapsed > PartitionAssignmentUpdateFrequency) { var newAssignment = await partitionAssignmentProvider().ConfigureAwait(false); if (newAssignment != partitionAssignment) { // Drain all batches to preserve order when partition assignment changes. await Drain().ConfigureAwait(false); partitionAssignment = newAssignment; } lastPartitionsAssignmentCheck.Restart(); } } await Drain().ConfigureAwait(false); } finally { flushCts.Cancel(); foreach (var batch in batches.Values) { batch.Buffer.Dispose(); Metrics.StreamerItemsQueuedDecrement(batch.Count); Metrics.StreamerBatchesActiveDecrement(); } } (Batch Batch, string Partition) Add(T item) { var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count, hashedColumnsPredicate: schema); try { return Add0(item, ref tupleBuilder); } finally { tupleBuilder.Dispose(); } } (Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder) { var columnCount = schema.Columns.Count; // Use MemoryMarshal to work around [CS8352]: "Cannot use variable 'noValueSet' in this context // because it may expose referenced variables outside of their declaration scope". Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1]; Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref MemoryMarshal.GetReference(noValueSet), columnCount); writer.Write(ref tupleBuilder, item, schema, columnCount, noValueSetRef); var partition = partitionAssignment == null ? string.Empty // Default connection. : partitionAssignment[Math.Abs(tupleBuilder.GetHash() % partitionAssignment.Length)]; var batch = GetOrCreateBatch(partition); lock (batch) { batch.Count++; noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount)); batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span); } Metrics.StreamerItemsQueuedIncrement(); return (batch, partition); } Batch GetOrCreateBatch(string partition) { ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _); if (batchRef == null) { batchRef = new Batch(); InitBuffer(batchRef); Metrics.StreamerBatchesActiveIncrement(); } return batchRef; } async Task SendAsync(Batch batch, string partition) { var expectedSize = batch.Count; // Wait for the previous task for this batch to finish: preserve order, backpressure control. await batch.Task.ConfigureAwait(false); lock (batch) { if (batch.Count != expectedSize || batch.Count == 0) { // Concurrent update happened. return; } var buf = batch.Buffer; // See RecordSerializer.WriteMultiple. buf.WriteByte(MsgPackCode.Int32, batch.CountPos); buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1); batch.Task = SendAndDisposeBufAsync(buf, partition, batch.Task, batch.Count); batch.Count = 0; batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync. InitBuffer(batch); batch.LastFlush = Stopwatch.GetTimestamp(); Metrics.StreamerBatchesActiveIncrement(); } } async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string partition, Task oldTask, int count) { try { // Wait for the previous batch for this node to preserve item order. await oldTask.ConfigureAwait(false); await sender(buf, partition, retryPolicy).ConfigureAwait(false); Metrics.StreamerBatchesSent.Add(1); Metrics.StreamerItemsSent.Add(count); } finally { buf.Dispose(); Metrics.StreamerItemsQueuedDecrement(count); Metrics.StreamerBatchesActiveDecrement(); } } async Task AutoFlushAsync(CancellationToken flushCt) { while (!flushCt.IsCancellationRequested) { await Task.Delay(options.AutoFlushFrequency, flushCt).ConfigureAwait(false); var ts = Stopwatch.GetTimestamp(); foreach (var (partition, batch) in batches) { if (batch.Count > 0 && ts - batch.LastFlush > options.AutoFlushFrequency.Ticks) { await SendAsync(batch, partition).ConfigureAwait(false); } } } } void InitBuffer(Batch batch) { var buf = batch.Buffer; var w = buf.MessageWriter; w.Write(schema.TableId); w.WriteTx(null); w.Write(schema.Version); batch.CountPos = buf.Position; buf.Advance(5); // Reserve count. } async Task Drain() { foreach (var (partition, batch) in batches) { if (batch.Count > 0) { await SendAsync(batch, partition).ConfigureAwait(false); } await batch.Task.ConfigureAwait(false); } } } private sealed record Batch { public PooledArrayBuffer Buffer { get; set; } = ProtoCommon.GetMessageWriter(); public int Count { get; set; } public int CountPos { get; set; } public Task Task { get; set; } = Task.CompletedTask; // Task for the previous buffer. public long LastFlush { get; set; } } }