code/KustoCopyConsole/Runner/AwaitExportedRunner.cs (200 lines of code) (raw):

using Azure.Core; using KustoCopyConsole.Entity.RowItems; using KustoCopyConsole.Entity.State; using KustoCopyConsole.JobParameter; using KustoCopyConsole.Kusto; using KustoCopyConsole.Kusto.Data; using KustoCopyConsole.Storage; using System; using System.Collections.Immutable; using System.Diagnostics; using System.Linq; namespace KustoCopyConsole.Runner { internal class AwaitExportedRunner : RunnerBase { #region Inner Types private record ClusterBlocks(Uri ClusterUri, IEnumerable<BlockRowItem> BlockItems); #endregion private const int MAX_OPERATIONS = 200; private static readonly IImmutableSet<string> FAILED_STATUS = ImmutableHashSet.Create( [ "Throttled", "Failed", "PartiallySucceeded", "Abandoned", "BadInput", "Canceled", "Skipped" ]); public AwaitExportedRunner( MainJobParameterization parameterization, TokenCredential credential, RowItemGateway rowItemGateway, DbClientFactory dbClientFactory, IStagingBlobUriProvider stagingBlobUriProvider) : base( parameterization, credential, rowItemGateway, dbClientFactory, stagingBlobUriProvider, TimeSpan.FromSeconds(15)) { } public async Task RunAsync(CancellationToken ct) { while (!AllActivitiesCompleted()) { var clusterBlocks = GetClusterBlocks(); var tasks = clusterBlocks .Select(o => UpdateOperationsAsync(o.ClusterUri, o.BlockItems, ct)) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(tasks); // Sleep await SleepAsync(ct); } } private IEnumerable<ClusterBlocks> GetClusterBlocks() { var hierarchy = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy( a => a.RowItem.State != ActivityState.Completed, i => i.RowItem.State != IterationState.Completed); var exportingBlocks = hierarchy .Where(h => h.Block.State == BlockState.Exporting); var clusterBlocks = exportingBlocks .GroupBy(h => h.Activity.SourceTable.ClusterUri) .Select(g => new ClusterBlocks( g.Key, g.Select(h => h.Block).OrderBy(b => b.Updated).Take(MAX_OPERATIONS))); return clusterBlocks; } private async Task UpdateOperationsAsync( Uri clusterUri, IEnumerable<BlockRowItem> blockItems, CancellationToken ct) { var dbClient = DbClientFactory.GetDbCommandClient(clusterUri, string.Empty); var operationIdMap = blockItems .ToImmutableDictionary(b => b.ExportOperationId); var statuses = await dbClient.ShowOperationsAsync( KustoPriority.HighestPriority, operationIdMap.Keys, ct); DetectLostOperationIds(operationIdMap, statuses); DetectFailures(operationIdMap, statuses); await CompleteOperationsAsync(operationIdMap, statuses, ct); } #region Handle Operations private void DetectLostOperationIds( IImmutableDictionary<string, BlockRowItem> operationIdMap, IImmutableList<ExportOperationStatus> status) { var statusOperationIdBag = status.Select(s => s.OperationId).ToHashSet(); foreach (var id in operationIdMap.Keys) { if (!statusOperationIdBag.Contains(id)) { var block = operationIdMap[id]; TraceWarning($"Warning! Operation ID lost: '{id}' for " + $"block {block.BlockId} (Iteration={block.IterationId}, " + $"Activity='{block.ActivityName}') ; block marked for reprocessing"); block.ExportOperationId = string.Empty; block.ChangeState(BlockState.Planned); RowItemGateway.Append(block); } } } private void DetectFailures( IImmutableDictionary<string, BlockRowItem> operationIdMap, IImmutableList<ExportOperationStatus> statuses) { var failedStatuses = statuses .Where(s => FAILED_STATUS.Contains(s.State)); foreach (var status in failedStatuses) { var block = operationIdMap[status.OperationId]; var message = status.ShouldRetry ? "block marked for reprocessing" : "block can't be re-exported"; var warning = $"Warning! Operation ID in state '{status.State}', " + $"status '{status.Status}' " + $"block {block.BlockId} (Iteration={block.IterationId}, " + $"Activity='{block.ActivityName}') ; {message}"; TraceWarning(warning); if (status.ShouldRetry) { block.ExportOperationId = string.Empty; block = block.ChangeState(BlockState.Planned); RowItemGateway.Append(block); } else { throw new CopyException($"Permanent export error", false); } } } private async Task CompleteOperationsAsync( IImmutableDictionary<string, BlockRowItem> operationIdMap, IImmutableList<ExportOperationStatus> statuses, CancellationToken ct) { async Task ProcessOperationAsync( ExportOperationStatus status, BlockRowItem block, CancellationToken ct) { var activity = RowItemGateway.InMemoryCache.ActivityMap[block.ActivityName].RowItem; var dbClient = DbClientFactory.GetDbCommandClient( activity.SourceTable.ClusterUri, activity.SourceTable.DatabaseName); var details = await dbClient.ShowExportDetailsAsync( new KustoPriority(block.GetBlockKey()), status.OperationId, ct); var urls = details .Select(d => new UrlRowItem { State = UrlState.Exported, ActivityName = block.ActivityName, IterationId = block.IterationId, BlockId = block.BlockId, Url = d.BlobUri.ToString(), RowCount = d.RecordCount }); var newBlock = block.ChangeState(BlockState.Exported); newBlock.ReplannedBlockIds = block.ReplannedBlockIds.Clear(); newBlock.ExportOperationId = string.Empty; newBlock.ExportDuration = status.Duration; newBlock.ExportedRowCount = details.Sum(d => d.RecordCount); Trace.TraceInformation($"Exported block {block.GetBlockKey()}: {urls.Count()} " + $"urls in {newBlock.ExportDuration}"); RowItemGateway.Append(urls); RowItemGateway.Append(newBlock); ValidatePlannedRowCount(block); } var tasks = statuses .Where(s => s.State == "Completed") .Select(s => ProcessOperationAsync(s, operationIdMap[s.OperationId], ct)) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(tasks); } private void ValidatePlannedRowCount(BlockRowItem block) { var cachedBlock = RowItemGateway.InMemoryCache .ActivityMap[block.ActivityName] .IterationMap[block.IterationId] .BlockMap[block.BlockId]; var exportedRowCount = cachedBlock.UrlMap.Values.Sum(u => u.RowItem.RowCount); if (cachedBlock.RowItem.PlannedRowCount != exportedRowCount) { TraceWarning($"Warning! For block ID {block.BlockId} " + $"(activity '{block.ActivityName}', iteration {block.IterationId}) " + $"had planned row count of {cachedBlock.RowItem.PlannedRowCount} but " + $"exported {exportedRowCount} rows"); } } #endregion } }