code/KustoCopyConsole/Storage/RowItemGateway.cs (242 lines of code) (raw):

using KustoCopyConsole.Entity; using KustoCopyConsole.Entity.InMemory; using KustoCopyConsole.Entity.RowItems; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; namespace KustoCopyConsole.Storage { internal class RowItemGateway : IAsyncDisposable { #region Inner Types private record QueuedRowItem( DateTime EnqueueTime, byte[] Buffer, RowItemInMemoryCache? SnapshotCache, TaskCompletionSource? TaskSource); #endregion private const long MAX_VOLUME_BEFORE_SNAPSHOT = 20000000; private static readonly TimeSpan MIN_WAIT_PERIOD = TimeSpan.FromSeconds(1); private static readonly TimeSpan FLUSH_PERIOD = TimeSpan.FromSeconds(5); private static readonly RowItemSerializer _rowItemSerializer = CreateRowItemSerializer(); private readonly LogStorage _logStorage; private readonly ConcurrentQueue<QueuedRowItem> _rowItemQueue = new(); private readonly ConcurrentQueue<Task> _releaseSourceTaskQueue = new(); private readonly Task _backgroundTask; private readonly TaskCompletionSource _backgroundCompletedSource = new(); private readonly object _lock = new object(); private long _volumeSinceLastSnapshot = 0; private volatile RowItemInMemoryCache _inMemoryCache; #region Construction private RowItemGateway( LogStorage logStorage, RowItemInMemoryCache cache, CancellationToken ct) { _logStorage = logStorage; _backgroundTask = Task.Run(() => BackgroundPersistanceAsync(ct)); _inMemoryCache = cache; } public static async Task<RowItemGateway> CreateAsync( LogStorage logStorage, CancellationToken ct) { var cache = new RowItemInMemoryCache(); await foreach (var chunk in logStorage.ReadLatestViewAsync(ct)) { using (var stream = chunk.Stream) using (var reader = new StreamReader(stream)) { string? line; while ((line = await reader.ReadLineAsync()) != null) { var item = _rowItemSerializer.Deserialize(line); cache = cache.AppendItem(item); } } } await logStorage.WriteLatestViewAsync(StreamCache(cache), ct); return new RowItemGateway(logStorage, cache, ct); } private static RowItemSerializer CreateRowItemSerializer() { return new RowItemSerializer() .AddType<ActivityRowItem>(RowType.Activity) .AddType<IterationRowItem>(RowType.Iteration) .AddType<TempTableRowItem>(RowType.TempTable) .AddType<BlockRowItem>(RowType.Block) .AddType<UrlRowItem>(RowType.Url) .AddType<ExtentRowItem>(RowType.Extent); } private static IEnumerable<byte> StreamCache(RowItemInMemoryCache cache) { foreach (var item in cache.GetItems()) { var text = _rowItemSerializer.Serialize(item); var buffer = ASCIIEncoding.UTF8.GetBytes(text); foreach (var character in buffer) { yield return character; } } } #endregion public RowItemInMemoryCache InMemoryCache => _inMemoryCache; async ValueTask IAsyncDisposable.DisposeAsync() { _backgroundCompletedSource.SetResult(); await _backgroundTask; await Task.WhenAll(_releaseSourceTaskQueue); } public void Append(RowItemBase item) { AppendInternal(new[] { item }, null); } public void Append(IEnumerable<RowItemBase> items) { AppendInternal(items, null); } public Task AppendAndPersistAsync(RowItemBase item, CancellationToken ct) { var taskSource = new TaskCompletionSource(); AppendInternal(new[] { item }, taskSource); return taskSource.Task; } public async Task AppendAndPersistAsync( IEnumerable<RowItemBase> items, CancellationToken ct) { var materializedItems = items.ToImmutableArray(); if (materializedItems.Any()) { var taskSource = new TaskCompletionSource(); AppendInternal(items, taskSource); await taskSource.Task; } } private void AppendInternal( IEnumerable<RowItemBase> items, TaskCompletionSource? TaskSource) { var materializedItems = items.ToImmutableArray(); var binaryItems = new List<byte[]>(); RowItemInMemoryCache? snapshot = null; foreach (var item in materializedItems) { item.Validate(); var text = _rowItemSerializer.Serialize(item); var binaryItem = ASCIIEncoding.ASCII.GetBytes(text); binaryItems.Add(binaryItem); } lock (_lock) { var newCache = _inMemoryCache; foreach (var item in materializedItems) { newCache = newCache.AppendItem(item); } Interlocked.Exchange(ref _inMemoryCache, newCache); Interlocked.Add(ref _volumeSinceLastSnapshot, binaryItems.Sum(i => i.Length)); if (_volumeSinceLastSnapshot > MAX_VOLUME_BEFORE_SNAPSHOT) { snapshot = newCache; Interlocked.Exchange(ref _volumeSinceLastSnapshot, 0); } } foreach (var binaryItem in binaryItems) { _rowItemQueue.Enqueue(new QueuedRowItem(DateTime.Now, binaryItem, snapshot, TaskSource)); } } private async Task BackgroundPersistanceAsync(CancellationToken ct) { while (!_backgroundCompletedSource.Task.IsCompleted) { if (_rowItemQueue.TryPeek(out var queueItem)) { var delta = DateTime.Now - queueItem.EnqueueTime; var waitTime = FLUSH_PERIOD - delta; if (waitTime < MIN_WAIT_PERIOD) { await PersistBatchAsync(ct); } else { // Wait for first item to age to about FLUSH_PERIOD await Task.WhenAny( Task.Delay(waitTime, ct), _backgroundCompletedSource.Task); } } else { // Wait for an element to pop in await Task.WhenAny( Task.Delay(FLUSH_PERIOD, ct), _backgroundCompletedSource.Task); } await CleanReleaseSourceTaskQueueAsync(); } } private async Task CleanReleaseSourceTaskQueueAsync() { while (_releaseSourceTaskQueue.TryPeek(out var task) && task.IsCompleted) { if (_releaseSourceTaskQueue.TryDequeue(out var task2)) { await task2; } } } private async Task PersistBatchAsync(CancellationToken ct) { using (var bufferStream = new MemoryStream()) { var sources = new List<TaskCompletionSource>(); RowItemInMemoryCache? lastCache = null; while (true) { if (!_rowItemQueue.TryPeek(out var queueItem) || bufferStream.Length + queueItem.Buffer.Length > _logStorage.MaxBufferSize) { // Flush buffer stream if (bufferStream.Length == 0) { throw new InvalidDataException("No buffer to append"); } await _logStorage.AtomicAppendAsync(bufferStream.ToArray(), ct); // Release tasks foreach (var source in sources) { // We run it on another thread not to block the persistance _releaseSourceTaskQueue.Enqueue(Task.Run(() => source.TrySetResult())); } if (lastCache != null) { await _logStorage.WriteLatestViewAsync(StreamCache(lastCache), ct); } return; } else { // Append to buffer stream if (_rowItemQueue.TryDequeue(out queueItem)) { lastCache = queueItem.SnapshotCache ?? lastCache; bufferStream.Write(queueItem.Buffer); if (queueItem.TaskSource != null) { sources.Add(queueItem.TaskSource); } } else { throw new InvalidOperationException( "We dequeue what we just peeked, this shouldn't fail"); } } } } } } }