in code/KustoCopyConsole/Runner/PlanningRunner.cs [252:302]
private async Task PlanBlocksAsync(
DbQueryClient queryClient,
DbCommandClient dbCommandClient,
IterationRowItem iterationItem,
CancellationToken ct)
{
var protoBlocks = ProtoBlockCollection.Empty;
// Loop on block batches
while (iterationItem.State == IterationState.Planning)
{
var blockMap = RowItemGateway.InMemoryCache
.ActivityMap[iterationItem.ActivityName]
.IterationMap[iterationItem.IterationId]
.BlockMap;
var lastBlock = blockMap.Any()
? blockMap.Values.ArgMax(b => b.RowItem.BlockId).RowItem
: null;
var newProtoBlocks = await GetProtoBlockAsync(
iterationItem,
protoBlocks.LastIngestionTimeEnd() ?? lastBlock?.IngestionTimeEnd,
queryClient,
dbCommandClient,
ct);
protoBlocks = protoBlocks.Add(newProtoBlocks);
Trace.TraceInformation($"Planning {iterationItem.GetIterationKey()}: " +
$"{newProtoBlocks.Count} new protoblocks compacted into " +
$"{protoBlocks.PopCompletedBlocks(false).Blocks.Count()} blocks");
protoBlocks = PlanBlockBatch(
protoBlocks,
!newProtoBlocks.Any(),
iterationItem.ActivityName,
iterationItem.IterationId,
lastBlock?.BlockId ?? 0);
if (!newProtoBlocks.Any())
{
var isAnyBlock = RowItemGateway.InMemoryCache
.ActivityMap[iterationItem.ActivityName]
.IterationMap[iterationItem.IterationId]
.BlockMap
.Any();
iterationItem = isAnyBlock
? iterationItem.ChangeState(IterationState.Planned)
: iterationItem.ChangeState(IterationState.Completed);
RowItemGateway.Append(iterationItem);
}
}
}