in code/KustoCopyConsole/Storage/RowItemGateway.cs [226:275]
private async Task PersistBatchAsync(CancellationToken ct)
{
using (var bufferStream = new MemoryStream())
{
var sources = new List<TaskCompletionSource>();
RowItemInMemoryCache? lastCache = null;
while (true)
{
if (!_rowItemQueue.TryPeek(out var queueItem)
|| bufferStream.Length + queueItem.Buffer.Length
> _logStorage.MaxBufferSize)
{ // Flush buffer stream
if (bufferStream.Length == 0)
{
throw new InvalidDataException("No buffer to append");
}
await _logStorage.AtomicAppendAsync(bufferStream.ToArray(), ct);
// Release tasks
foreach (var source in sources)
{ // We run it on another thread not to block the persistance
_releaseSourceTaskQueue.Enqueue(Task.Run(() => source.TrySetResult()));
}
if (lastCache != null)
{
await _logStorage.WriteLatestViewAsync(StreamCache(lastCache), ct);
}
return;
}
else
{ // Append to buffer stream
if (_rowItemQueue.TryDequeue(out queueItem))
{
lastCache = queueItem.SnapshotCache ?? lastCache;
bufferStream.Write(queueItem.Buffer);
if (queueItem.TaskSource != null)
{
sources.Add(queueItem.TaskSource);
}
}
else
{
throw new InvalidOperationException(
"We dequeue what we just peeked, this shouldn't fail");
}
}
}
}
}