private async Task PersistBatchAsync()

in code/KustoCopyConsole/Storage/RowItemGateway.cs [226:275]


        private async Task PersistBatchAsync(CancellationToken ct)
        {
            using (var bufferStream = new MemoryStream())
            {
                var sources = new List<TaskCompletionSource>();
                RowItemInMemoryCache? lastCache = null;

                while (true)
                {
                    if (!_rowItemQueue.TryPeek(out var queueItem)
                        || bufferStream.Length + queueItem.Buffer.Length
                            > _logStorage.MaxBufferSize)
                    {   //  Flush buffer stream
                        if (bufferStream.Length == 0)
                        {
                            throw new InvalidDataException("No buffer to append");
                        }
                        await _logStorage.AtomicAppendAsync(bufferStream.ToArray(), ct);
                        //  Release tasks
                        foreach (var source in sources)
                        {   //  We run it on another thread not to block the persistance
                            _releaseSourceTaskQueue.Enqueue(Task.Run(() => source.TrySetResult()));
                        }
                        if (lastCache != null)
                        {
                            await _logStorage.WriteLatestViewAsync(StreamCache(lastCache), ct);
                        }

                        return;
                    }
                    else
                    {   //  Append to buffer stream
                        if (_rowItemQueue.TryDequeue(out queueItem))
                        {
                            lastCache = queueItem.SnapshotCache ?? lastCache;
                            bufferStream.Write(queueItem.Buffer);
                            if (queueItem.TaskSource != null)
                            {
                                sources.Add(queueItem.TaskSource);
                            }
                        }
                        else
                        {
                            throw new InvalidOperationException(
                                "We dequeue what we just peeked, this shouldn't fail");
                        }
                    }
                }
            }
        }