code/KustoCopyConsole/Runner/AwaitIngestRunner.cs (259 lines of code) (raw):

using Azure.Core; using KustoCopyConsole.Entity.InMemory; using KustoCopyConsole.Entity.RowItems; using KustoCopyConsole.Entity.State; using KustoCopyConsole.JobParameter; using KustoCopyConsole.Kusto; using KustoCopyConsole.Storage; using System.Collections.Immutable; using System.Diagnostics; namespace KustoCopyConsole.Runner { internal class AwaitIngestRunner : RunnerBase { private const int MAXIMUM_EXTENT_MOVING = 100; public AwaitIngestRunner( MainJobParameterization parameterization, TokenCredential credential, RowItemGateway rowItemGateway, DbClientFactory dbClientFactory, IStagingBlobUriProvider stagingBlobUriProvider) : base( parameterization, credential, rowItemGateway, dbClientFactory, stagingBlobUriProvider, TimeSpan.FromSeconds(15)) { } public async Task RunAsync(CancellationToken ct) { while (!AllActivitiesCompleted()) { await UpdateIngestedAsync(ct); await FailureDetectionAsync(ct); await MoveAsync(ct); // Sleep await SleepAsync(ct); } } private async Task UpdateIngestedAsync(CancellationToken ct) { async Task<IEnumerable<RowItemBase>> GetIngestionItemsAsync( ActivityRowItem activity, IterationRowItem iteration, IEnumerable<ActivityFlatHierarchy> items, string tempTableName, CancellationToken ct) { var dbClient = DbClientFactory.GetDbCommandClient( activity.DestinationTable.ClusterUri, activity.DestinationTable.DatabaseName); var allExtentRowCounts = await dbClient.GetExtentRowCountsAsync( new KustoPriority(iteration.GetIterationKey()), tempTableName, ct); var extentRowCountByTags = allExtentRowCounts .GroupBy(e => e.Tags) .ToImmutableDictionary(g => g.Key); var ingestionItems = new List<RowItemBase>(); Trace.TraceInformation($"AwaitIngest: {allExtentRowCounts.Count} " + $"extents found with {extentRowCountByTags.Count} tags"); foreach (var item in items) { if (extentRowCountByTags.TryGetValue( item.Block.BlockTag, out var extentRowCounts)) { var targetRowCount = item .Urls .Sum(h => h.RowCount); var blockExtentRowCount = extentRowCounts .Sum(e => e.RecordCount); if (blockExtentRowCount > targetRowCount) { throw new CopyException( $"Target row count is {targetRowCount} while " + $"we observe {blockExtentRowCount}", false); } if (blockExtentRowCount == targetRowCount) { var extentItems = extentRowCounts .Select(e => new ExtentRowItem { ActivityName = activity.ActivityName, IterationId = iteration.IterationId, BlockId = item.Block.BlockId, ExtentId = e.ExtentId, RowCount = e.RecordCount }); ingestionItems.AddRange(extentItems); ingestionItems.Add(item.Block.ChangeState(BlockState.Ingested)); } } } return ingestionItems; } var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy( a => a.RowItem.State != ActivityState.Completed, i => i.RowItem.State != IterationState.Completed); var queuedBlocks = allBlocks .Where(h => h.Block.State == BlockState.Queued); var detectIngestionTasks = queuedBlocks .Where(h => h.TempTable != null) .GroupBy(h => h.TempTable) .Select(g => GetIngestionItemsAsync( g.First().Activity, g.First().Iteration, g, g.Key!.TempTableName, ct)) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(detectIngestionTasks); var ingestionItems = detectIngestionTasks .SelectMany(t => t.Result); // We do wait for the ingested status to persist before moving // This is to avoid moving extents before the confirmation of // ingestion is persisted: this would result in the block // staying in "queued" if the process would restart await RowItemGateway.AppendAndPersistAsync(ingestionItems, ct); } private async Task FailureDetectionAsync(CancellationToken ct) { var activeIterations = RowItemGateway.InMemoryCache .ActivityMap .Values .Where(a => a.RowItem.State != ActivityState.Completed) .SelectMany(a => a.IterationMap.Values) .Where(i => i.RowItem.State != IterationState.Completed); var iterationTasks = activeIterations .Select(i => IterationFailureDetectionAsync(i, ct)) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(iterationTasks); } private async Task IterationFailureDetectionAsync( IterationCache iterationCache, CancellationToken ct) { var queuedBlocks = iterationCache .BlockMap .Values .Where(b => b.RowItem.State == BlockState.Queued); if (queuedBlocks.Any()) { var activityItem = RowItemGateway.InMemoryCache .ActivityMap[iterationCache.RowItem.ActivityName] .RowItem; var ingestClient = DbClientFactory.GetIngestClient( activityItem.DestinationTable.ClusterUri, activityItem.DestinationTable.DatabaseName, iterationCache.TempTable!.TempTableName); var oldestBlock = queuedBlocks .ArgMin(b => b.RowItem.Updated); foreach (var urlItem in oldestBlock.UrlMap.Values.Select(u => u.RowItem)) { var failure = await ingestClient.FetchIngestionFailureAsync( urlItem.SerializedQueuedResult); if (failure != null) { TraceWarning( $"Warning! Ingestion failed with status '{failure.Status}'" + $"and detail '{failure.Details}' for blob {urlItem.Url} in block " + $"{oldestBlock.RowItem.BlockId}, iteration " + $"{oldestBlock.RowItem.IterationId}, activity " + $"{oldestBlock.RowItem.ActivityName} ; block will be re-exported"); ReturnToPlanned(oldestBlock); return; } } } } private void ReturnToPlanned(BlockCache oldestBlock) { var newBlock = oldestBlock.RowItem.ChangeState(BlockState.Planned); newBlock.ExportOperationId = string.Empty; newBlock.BlockTag = string.Empty; RowItemGateway.Append(newBlock); } private async Task MoveAsync(CancellationToken ct) { var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy( a => a.RowItem.State != ActivityState.Completed, i => i.RowItem.State != IterationState.Completed); var ingestedBlocks = allBlocks .Where(h => h.Block.State == BlockState.Ingested); var moveTasks = ingestedBlocks .GroupBy(h => h.Iteration.GetIterationKey()) .Select(g => MoveBlocksFromIterationAsync( g.First().Activity, g.First().Iteration, g.First().TempTable!.TempTableName, g, ct)) .ToImmutableArray(); Trace.TraceInformation($"AwaitIngest: {moveTasks.Count()} extent moving commands"); await TaskHelper.WhenAllWithErrors(moveTasks); } private async Task MoveBlocksFromIterationAsync( ActivityRowItem activity, IterationRowItem iteration, string tempTableName, IEnumerable<ActivityFlatHierarchy> items, CancellationToken ct) { var commandClient = DbClientFactory.GetDbCommandClient( activity.DestinationTable.ClusterUri, activity.DestinationTable.DatabaseName); var priority = new KustoPriority(iteration.GetIterationKey()); var sortedItems = items .OrderBy(i => i.Block.BlockId) .ToImmutableArray(); while (sortedItems.Any()) { var movingItems = TakeMovingBlocks(sortedItems); var movingExtentIds = movingItems .SelectMany(i => i.Extents.Select(e => e.ExtentId)); var tags = movingItems .Select(i => i.Block.BlockTag); var extentCount = await commandClient.MoveExtentsAsync( priority, tempTableName, activity.DestinationTable.TableName, movingExtentIds, ct); var cleanCount = await commandClient.CleanExtentTagsAsync( priority, activity.DestinationTable.TableName, tags, ct); var newBlockItems = movingItems .Select(i => i.Block.ChangeState(BlockState.ExtentMoved)) .ToImmutableArray(); foreach(var item in newBlockItems) { item.BlockTag = string.Empty; } RowItemGateway.Append(newBlockItems); sortedItems = sortedItems .Skip(movingItems.Count()) .ToImmutableArray(); } } private IEnumerable<ActivityFlatHierarchy> TakeMovingBlocks( IEnumerable<ActivityFlatHierarchy> items) { var i = 0; var totalExtents = 0; foreach (var item in items) { if (totalExtents + item.Extents.Count() > MAXIMUM_EXTENT_MOVING) { return items.Take(Math.Max(1, i)); } else { totalExtents += item.Extents.Count(); ++i; } } return items; } } }