private async Task UpdateIngestedAsync()

in code/KustoCopyConsole/Runner/AwaitIngestRunner.cs [46:134]


        private async Task UpdateIngestedAsync(CancellationToken ct)
        {
            async Task<IEnumerable<RowItemBase>> GetIngestionItemsAsync(
                ActivityRowItem activity,
                IterationRowItem iteration,
                IEnumerable<ActivityFlatHierarchy> items,
                string tempTableName,
                CancellationToken ct)
            {
                var dbClient = DbClientFactory.GetDbCommandClient(
                    activity.DestinationTable.ClusterUri,
                    activity.DestinationTable.DatabaseName);
                var allExtentRowCounts = await dbClient.GetExtentRowCountsAsync(
                    new KustoPriority(iteration.GetIterationKey()),
                    tempTableName,
                    ct);
                var extentRowCountByTags = allExtentRowCounts
                    .GroupBy(e => e.Tags)
                    .ToImmutableDictionary(g => g.Key);
                var ingestionItems = new List<RowItemBase>();

                Trace.TraceInformation($"AwaitIngest:  {allExtentRowCounts.Count} " +
                    $"extents found with {extentRowCountByTags.Count} tags");
                foreach (var item in items)
                {
                    if (extentRowCountByTags.TryGetValue(
                        item.Block.BlockTag,
                        out var extentRowCounts))
                    {
                        var targetRowCount = item
                            .Urls
                            .Sum(h => h.RowCount);
                        var blockExtentRowCount = extentRowCounts
                            .Sum(e => e.RecordCount);

                        if (blockExtentRowCount > targetRowCount)
                        {
                            throw new CopyException(
                                $"Target row count is {targetRowCount} while " +
                                $"we observe {blockExtentRowCount}",
                                false);
                        }
                        if (blockExtentRowCount == targetRowCount)
                        {
                            var extentItems = extentRowCounts
                                .Select(e => new ExtentRowItem
                                {
                                    ActivityName = activity.ActivityName,
                                    IterationId = iteration.IterationId,
                                    BlockId = item.Block.BlockId,
                                    ExtentId = e.ExtentId,
                                    RowCount = e.RecordCount
                                });

                            ingestionItems.AddRange(extentItems);
                            ingestionItems.Add(item.Block.ChangeState(BlockState.Ingested));
                        }
                    }
                }

                return ingestionItems;
            }
            var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy(
                        a => a.RowItem.State != ActivityState.Completed,
                        i => i.RowItem.State != IterationState.Completed);
            var queuedBlocks = allBlocks
                .Where(h => h.Block.State == BlockState.Queued);
            var detectIngestionTasks = queuedBlocks
                .Where(h => h.TempTable != null)
                .GroupBy(h => h.TempTable)
                .Select(g => GetIngestionItemsAsync(
                    g.First().Activity,
                    g.First().Iteration,
                    g,
                    g.Key!.TempTableName,
                    ct))
                .ToImmutableArray();

            await TaskHelper.WhenAllWithErrors(detectIngestionTasks);

            var ingestionItems = detectIngestionTasks
                .SelectMany(t => t.Result);

            //  We do wait for the ingested status to persist before moving
            //  This is to avoid moving extents before the confirmation of
            //  ingestion is persisted:  this would result in the block
            //  staying in "queued" if the process would restart
            await RowItemGateway.AppendAndPersistAsync(ingestionItems, ct);
        }