code/KustoCopyConsole/Runner/PlanningRunner.cs (358 lines of code) (raw):

using Azure.Core; using KustoCopyConsole.Entity.RowItems; using KustoCopyConsole.Entity.State; using KustoCopyConsole.JobParameter; using KustoCopyConsole.Kusto; using KustoCopyConsole.Storage; using System.Collections.Immutable; using System.Diagnostics; using System.Linq; namespace KustoCopyConsole.Runner { internal class PlanningRunner : RunnerBase { #region Inner Types private record ProtoBlock( DateTime IngestionTimeStart, DateTime IngestionTimeEnd, long RowCount, DateTime MinCreationTime, DateTime MaxCreationTime) { public ProtoBlock Merge(ProtoBlock other) { return new ProtoBlock( Min(IngestionTimeStart, other.IngestionTimeStart), Max(IngestionTimeEnd, other.IngestionTimeEnd), RowCount + other.RowCount, Min(MinCreationTime, MaxCreationTime), Max(MinCreationTime, MaxCreationTime)); } public TimeSpan? CreationTimeDelta => MaxCreationTime - MinCreationTime; private static DateTime Min(DateTime a, DateTime b) { return a < b ? a : b; } private static DateTime Max(DateTime a, DateTime b) { return a > b ? a : b; } } private class ProtoBlockCollection { private readonly IImmutableList<ProtoBlock> _completedBlocks; private readonly ProtoBlock? _remainingBlock; private ProtoBlockCollection( IEnumerable<ProtoBlock> completedBlocks, ProtoBlock? remainingBlock) { _completedBlocks = completedBlocks.ToImmutableArray(); _remainingBlock = remainingBlock; } public static ProtoBlockCollection Empty { get; } = new(Array.Empty<ProtoBlock>(), null); public DateTime? LastIngestionTimeEnd() { var allBlocks = _remainingBlock == null ? _completedBlocks : _completedBlocks.Append(_remainingBlock); var lastIngestionTimeEndBlock = allBlocks.Any() ? allBlocks.ArgMax(b => b.IngestionTimeEnd) : null; return lastIngestionTimeEndBlock?.IngestionTimeEnd; } public ProtoBlockCollection Add(IEnumerable<ProtoBlock> blocks) { var remainingBlocks = _remainingBlock == null ? blocks : blocks.Prepend(_remainingBlock); // We sort descending since the stack serves them upside-down var stack = new Stack<ProtoBlock>(remainingBlocks .OrderByDescending(d => d.IngestionTimeStart) .ThenByDescending(d => d.IngestionTimeEnd)); var completedBlocks = new List<ProtoBlock>(_completedBlocks); while (stack.Any()) { var first = stack.Pop(); if (stack.Any()) { var second = stack.Pop(); var merge = first.Merge(second); if (first.IngestionTimeEnd == second.IngestionTimeStart || (merge.RowCount <= RECORDS_PER_BLOCK && merge.CreationTimeDelta < TimeSpan.FromDays(1))) { // We merge stack.Push(merge); } else { // We don't merge completedBlocks.Add(first); stack.Push(second); } } else { // Only way to get out if you got into the while loop return new ProtoBlockCollection(completedBlocks, first); } } // If you didn't get into the while loop, there was no remaining block return new ProtoBlockCollection(completedBlocks, null); } public (IEnumerable<ProtoBlock> Blocks, ProtoBlockCollection Collection) PopCompletedBlocks(bool includeIncompleteBlock) { if (includeIncompleteBlock) { return ( _remainingBlock == null ? _completedBlocks : _completedBlocks.Append(_remainingBlock), ProtoBlockCollection.Empty); } else { return ( _completedBlocks, new ProtoBlockCollection(Array.Empty<ProtoBlock>(), _remainingBlock)); } } } private record BatchExportBlock( IEnumerable<Task> exportingTasks, long nextBlockId, DateTime? nextIngestionTimeStart); #endregion private const int MAX_STATS_COUNT = 250000; private const long RECORDS_PER_BLOCK = 1048576; public PlanningRunner( MainJobParameterization parameterization, TokenCredential credential, RowItemGateway rowItemGateway, DbClientFactory dbClientFactory, IStagingBlobUriProvider stagingBlobUriProvider) : base( parameterization, credential, rowItemGateway, dbClientFactory, stagingBlobUriProvider, TimeSpan.FromSeconds(5)) { } public async Task RunAsync(CancellationToken ct) { var tasks = Parameterization .Activities .Keys .Select(a => Task.Run(() => RunActivityAsync(a, ct))) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(tasks); } private async Task RunActivityAsync(string activityName, CancellationToken ct) { while (!AllActivitiesCompleted()) { if (RowItemGateway.InMemoryCache.ActivityMap.TryGetValue( activityName, out var activity)) { var newIterations = activity.IterationMap .Values .Select(i => i.RowItem) .Where(i => i.State <= IterationState.Planning) .Select(i => new { Key = i.GetIterationKey(), Iteration = i }); foreach (var o in newIterations) { await PlanIterationAsync(o.Iteration, ct); } } // Sleep await SleepAsync(ct); } } private async Task PlanIterationAsync( IterationRowItem iterationItem, CancellationToken ct) { var activity = RowItemGateway.InMemoryCache .ActivityMap[iterationItem.ActivityName] .RowItem; var queryClient = DbClientFactory.GetDbQueryClient( activity.SourceTable.ClusterUri, activity.SourceTable.DatabaseName); var dbCommandClient = DbClientFactory.GetDbCommandClient( activity.SourceTable.ClusterUri, activity.SourceTable.DatabaseName); if (iterationItem.State == IterationState.Starting) { var cursor = await queryClient.GetCurrentCursorAsync( new KustoPriority(iterationItem.GetIterationKey()), ct); iterationItem = iterationItem.ChangeState(IterationState.Planning); iterationItem.CursorEnd = cursor; RowItemGateway.Append(iterationItem); } await ValidateIngestionTimeAsync(queryClient, activity, iterationItem, ct); await PlanBlocksAsync(queryClient, dbCommandClient, iterationItem, ct); } private async Task ValidateIngestionTimeAsync( DbQueryClient queryClient, ActivityRowItem activity, IterationRowItem iterationItem, CancellationToken ct) { var activityParam = Parameterization.Activities[iterationItem.ActivityName]; var hasNullIngestionTime = await queryClient.HasNullIngestionTime( new KustoPriority(iterationItem.GetIterationKey()), activity.SourceTable.TableName, activityParam.KqlQuery, ct); if (hasNullIngestionTime) { throw new CopyException( $"Activity '{activity.ActivityName}' / Iteration" + $" {iterationItem.IterationId}: null ingestion time are present." + $" Null ingestion time aren't supported.", false); } } private async Task PlanBlocksAsync( DbQueryClient queryClient, DbCommandClient dbCommandClient, IterationRowItem iterationItem, CancellationToken ct) { var protoBlocks = ProtoBlockCollection.Empty; // Loop on block batches while (iterationItem.State == IterationState.Planning) { var blockMap = RowItemGateway.InMemoryCache .ActivityMap[iterationItem.ActivityName] .IterationMap[iterationItem.IterationId] .BlockMap; var lastBlock = blockMap.Any() ? blockMap.Values.ArgMax(b => b.RowItem.BlockId).RowItem : null; var newProtoBlocks = await GetProtoBlockAsync( iterationItem, protoBlocks.LastIngestionTimeEnd() ?? lastBlock?.IngestionTimeEnd, queryClient, dbCommandClient, ct); protoBlocks = protoBlocks.Add(newProtoBlocks); Trace.TraceInformation($"Planning {iterationItem.GetIterationKey()}: " + $"{newProtoBlocks.Count} new protoblocks compacted into " + $"{protoBlocks.PopCompletedBlocks(false).Blocks.Count()} blocks"); protoBlocks = PlanBlockBatch( protoBlocks, !newProtoBlocks.Any(), iterationItem.ActivityName, iterationItem.IterationId, lastBlock?.BlockId ?? 0); if (!newProtoBlocks.Any()) { var isAnyBlock = RowItemGateway.InMemoryCache .ActivityMap[iterationItem.ActivityName] .IterationMap[iterationItem.IterationId] .BlockMap .Any(); iterationItem = isAnyBlock ? iterationItem.ChangeState(IterationState.Planned) : iterationItem.ChangeState(IterationState.Completed); RowItemGateway.Append(iterationItem); } } } private ProtoBlockCollection PlanBlockBatch( ProtoBlockCollection protoBlocks, bool includeIncompleteBlock, string activityName, long iterationId, long lastBlockId) { (var blocks, var newProtoBlocks) = protoBlocks.PopCompletedBlocks(includeIncompleteBlock); foreach (var block in blocks) { var blockItem = new BlockRowItem { State = BlockState.Planned, ActivityName = activityName, IterationId = iterationId, BlockId = ++lastBlockId, IngestionTimeStart = block.IngestionTimeStart, IngestionTimeEnd = block.IngestionTimeEnd, MinCreationTime = block.MinCreationTime, MaxCreationTime = block.MaxCreationTime, PlannedRowCount = block.RowCount }; RowItemGateway.Append(blockItem); } return newProtoBlocks; } // Merge results from query + show extents command private async Task<IImmutableList<ProtoBlock>> GetProtoBlockAsync( IterationRowItem iterationItem, DateTime? ingestionTimeStart, DbQueryClient queryClient, DbCommandClient dbCommandClient, CancellationToken ct) { var activityItem = RowItemGateway.InMemoryCache .ActivityMap[iterationItem.ActivityName] .RowItem; var activityParam = Parameterization.Activities[iterationItem.ActivityName]; var distributions = await queryClient.GetRecordDistributionAsync( new KustoPriority(iterationItem.GetIterationKey()), activityItem.SourceTable.TableName, activityParam.KqlQuery, iterationItem.CursorStart, iterationItem.CursorEnd, ingestionTimeStart, MAX_STATS_COUNT, ct); if (distributions.Any()) { var extentIds = distributions .Select(d => d.ExtentId) // Exclude the empty extent-id (for row-store rows) .Where(id => !string.IsNullOrWhiteSpace(id)) .Distinct(); var extentDates = await dbCommandClient.GetExtentDatesAsync( new KustoPriority(iterationItem.GetIterationKey()), activityItem.SourceTable.TableName, extentIds, ct); var extentDateMap = extentDates .ToImmutableDictionary(e => e.ExtentId, e => e.MinCreatedOn); // Check for racing condition where extents got merged and extent ids don't exist // between 2 queries if (extentDates.Count == extentIds.Count()) { var protoBlocks = distributions .Select(d => { extentDateMap.TryGetValue(d.ExtentId, out DateTime creationTime); return new ProtoBlock( d.IngestionTimeStart, d.IngestionTimeEnd, d.RowCount, creationTime, creationTime); }) .ToImmutableArray(); return protoBlocks; } else { return await GetProtoBlockAsync( iterationItem, ingestionTimeStart, queryClient, dbCommandClient, ct); } } else { return ImmutableArray<ProtoBlock>.Empty; } } } }