in code/KustoCopyConsole/Runner/AwaitIngestRunner.cs [223:269]
private async Task MoveBlocksFromIterationAsync(
ActivityRowItem activity,
IterationRowItem iteration,
string tempTableName,
IEnumerable<ActivityFlatHierarchy> items,
CancellationToken ct)
{
var commandClient = DbClientFactory.GetDbCommandClient(
activity.DestinationTable.ClusterUri,
activity.DestinationTable.DatabaseName);
var priority = new KustoPriority(iteration.GetIterationKey());
var sortedItems = items
.OrderBy(i => i.Block.BlockId)
.ToImmutableArray();
while (sortedItems.Any())
{
var movingItems = TakeMovingBlocks(sortedItems);
var movingExtentIds = movingItems
.SelectMany(i => i.Extents.Select(e => e.ExtentId));
var tags = movingItems
.Select(i => i.Block.BlockTag);
var extentCount = await commandClient.MoveExtentsAsync(
priority,
tempTableName,
activity.DestinationTable.TableName,
movingExtentIds,
ct);
var cleanCount = await commandClient.CleanExtentTagsAsync(
priority,
activity.DestinationTable.TableName,
tags,
ct);
var newBlockItems = movingItems
.Select(i => i.Block.ChangeState(BlockState.ExtentMoved))
.ToImmutableArray();
foreach(var item in newBlockItems)
{
item.BlockTag = string.Empty;
}
RowItemGateway.Append(newBlockItems);
sortedItems = sortedItems
.Skip(movingItems.Count())
.ToImmutableArray();
}
}