private async Task CompleteOperationsAsync()

in code/KustoCopyConsole/Runner/AwaitExportedRunner.cs [153:201]


        private async Task CompleteOperationsAsync(
            IImmutableDictionary<string, BlockRowItem> operationIdMap,
            IImmutableList<ExportOperationStatus> statuses,
            CancellationToken ct)
        {
            async Task ProcessOperationAsync(
                ExportOperationStatus status,
                BlockRowItem block,
                CancellationToken ct)
            {
                var activity =
                    RowItemGateway.InMemoryCache.ActivityMap[block.ActivityName].RowItem;
                var dbClient = DbClientFactory.GetDbCommandClient(
                    activity.SourceTable.ClusterUri,
                    activity.SourceTable.DatabaseName);
                var details = await dbClient.ShowExportDetailsAsync(
                    new KustoPriority(block.GetBlockKey()),
                    status.OperationId,
                    ct);
                var urls = details
                    .Select(d => new UrlRowItem
                    {
                        State = UrlState.Exported,
                        ActivityName = block.ActivityName,
                        IterationId = block.IterationId,
                        BlockId = block.BlockId,
                        Url = d.BlobUri.ToString(),
                        RowCount = d.RecordCount
                    });
                var newBlock = block.ChangeState(BlockState.Exported);

                newBlock.ReplannedBlockIds = block.ReplannedBlockIds.Clear();
                newBlock.ExportOperationId = string.Empty;
                newBlock.ExportDuration = status.Duration;
                newBlock.ExportedRowCount = details.Sum(d => d.RecordCount);
                Trace.TraceInformation($"Exported block {block.GetBlockKey()}:  {urls.Count()} " +
                    $"urls in {newBlock.ExportDuration}");
                RowItemGateway.Append(urls);
                RowItemGateway.Append(newBlock);
                ValidatePlannedRowCount(block);
            }

            var tasks = statuses
                .Where(s => s.State == "Completed")
                .Select(s => ProcessOperationAsync(s, operationIdMap[s.OperationId], ct))
                .ToImmutableArray();

            await TaskHelper.WhenAllWithErrors(tasks);
        }