in code/KustoCopyConsole/Storage/RowItemGateway.cs [146:183]
private void AppendInternal(
IEnumerable<RowItemBase> items,
TaskCompletionSource? TaskSource)
{
var materializedItems = items.ToImmutableArray();
var binaryItems = new List<byte[]>();
RowItemInMemoryCache? snapshot = null;
foreach (var item in materializedItems)
{
item.Validate();
var text = _rowItemSerializer.Serialize(item);
var binaryItem = ASCIIEncoding.ASCII.GetBytes(text);
binaryItems.Add(binaryItem);
}
lock (_lock)
{
var newCache = _inMemoryCache;
foreach (var item in materializedItems)
{
newCache = newCache.AppendItem(item);
}
Interlocked.Exchange(ref _inMemoryCache, newCache);
Interlocked.Add(ref _volumeSinceLastSnapshot, binaryItems.Sum(i => i.Length));
if (_volumeSinceLastSnapshot > MAX_VOLUME_BEFORE_SNAPSHOT)
{
snapshot = newCache;
Interlocked.Exchange(ref _volumeSinceLastSnapshot, 0);
}
}
foreach (var binaryItem in binaryItems)
{
_rowItemQueue.Enqueue(new QueuedRowItem(DateTime.Now, binaryItem, snapshot, TaskSource));
}
}