in code/KustoCopyConsole/Runner/AwaitIngestRunner.cs [46:134]
private async Task UpdateIngestedAsync(CancellationToken ct)
{
async Task<IEnumerable<RowItemBase>> GetIngestionItemsAsync(
ActivityRowItem activity,
IterationRowItem iteration,
IEnumerable<ActivityFlatHierarchy> items,
string tempTableName,
CancellationToken ct)
{
var dbClient = DbClientFactory.GetDbCommandClient(
activity.DestinationTable.ClusterUri,
activity.DestinationTable.DatabaseName);
var allExtentRowCounts = await dbClient.GetExtentRowCountsAsync(
new KustoPriority(iteration.GetIterationKey()),
tempTableName,
ct);
var extentRowCountByTags = allExtentRowCounts
.GroupBy(e => e.Tags)
.ToImmutableDictionary(g => g.Key);
var ingestionItems = new List<RowItemBase>();
Trace.TraceInformation($"AwaitIngest: {allExtentRowCounts.Count} " +
$"extents found with {extentRowCountByTags.Count} tags");
foreach (var item in items)
{
if (extentRowCountByTags.TryGetValue(
item.Block.BlockTag,
out var extentRowCounts))
{
var targetRowCount = item
.Urls
.Sum(h => h.RowCount);
var blockExtentRowCount = extentRowCounts
.Sum(e => e.RecordCount);
if (blockExtentRowCount > targetRowCount)
{
throw new CopyException(
$"Target row count is {targetRowCount} while " +
$"we observe {blockExtentRowCount}",
false);
}
if (blockExtentRowCount == targetRowCount)
{
var extentItems = extentRowCounts
.Select(e => new ExtentRowItem
{
ActivityName = activity.ActivityName,
IterationId = iteration.IterationId,
BlockId = item.Block.BlockId,
ExtentId = e.ExtentId,
RowCount = e.RecordCount
});
ingestionItems.AddRange(extentItems);
ingestionItems.Add(item.Block.ChangeState(BlockState.Ingested));
}
}
}
return ingestionItems;
}
var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy(
a => a.RowItem.State != ActivityState.Completed,
i => i.RowItem.State != IterationState.Completed);
var queuedBlocks = allBlocks
.Where(h => h.Block.State == BlockState.Queued);
var detectIngestionTasks = queuedBlocks
.Where(h => h.TempTable != null)
.GroupBy(h => h.TempTable)
.Select(g => GetIngestionItemsAsync(
g.First().Activity,
g.First().Iteration,
g,
g.Key!.TempTableName,
ct))
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(detectIngestionTasks);
var ingestionItems = detectIngestionTasks
.SelectMany(t => t.Result);
// We do wait for the ingested status to persist before moving
// This is to avoid moving extents before the confirmation of
// ingestion is persisted: this would result in the block
// staying in "queued" if the process would restart
await RowItemGateway.AppendAndPersistAsync(ingestionItems, ct);
}