in code/KustoCopyConsole/Runner/AwaitExportedRunner.cs [153:201]
private async Task CompleteOperationsAsync(
IImmutableDictionary<string, BlockRowItem> operationIdMap,
IImmutableList<ExportOperationStatus> statuses,
CancellationToken ct)
{
async Task ProcessOperationAsync(
ExportOperationStatus status,
BlockRowItem block,
CancellationToken ct)
{
var activity =
RowItemGateway.InMemoryCache.ActivityMap[block.ActivityName].RowItem;
var dbClient = DbClientFactory.GetDbCommandClient(
activity.SourceTable.ClusterUri,
activity.SourceTable.DatabaseName);
var details = await dbClient.ShowExportDetailsAsync(
new KustoPriority(block.GetBlockKey()),
status.OperationId,
ct);
var urls = details
.Select(d => new UrlRowItem
{
State = UrlState.Exported,
ActivityName = block.ActivityName,
IterationId = block.IterationId,
BlockId = block.BlockId,
Url = d.BlobUri.ToString(),
RowCount = d.RecordCount
});
var newBlock = block.ChangeState(BlockState.Exported);
newBlock.ReplannedBlockIds = block.ReplannedBlockIds.Clear();
newBlock.ExportOperationId = string.Empty;
newBlock.ExportDuration = status.Duration;
newBlock.ExportedRowCount = details.Sum(d => d.RecordCount);
Trace.TraceInformation($"Exported block {block.GetBlockKey()}: {urls.Count()} " +
$"urls in {newBlock.ExportDuration}");
RowItemGateway.Append(urls);
RowItemGateway.Append(newBlock);
ValidatePlannedRowCount(block);
}
var tasks = statuses
.Where(s => s.State == "Completed")
.Select(s => ProcessOperationAsync(s, operationIdMap[s.OperationId], ct))
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(tasks);
}