code/KustoCopyConsole/Runner/IterationCompletingRunner.cs (93 lines of code) (raw):

using Azure.Core; using Azure.Storage.Files.DataLake; using KustoCopyConsole.Entity.RowItems.Keys; using KustoCopyConsole.Entity.State; using KustoCopyConsole.JobParameter; using KustoCopyConsole.Kusto; using KustoCopyConsole.Storage; using System; using System.Collections.Immutable; using System.Linq; namespace KustoCopyConsole.Runner { internal class IterationCompletingRunner : RunnerBase { public IterationCompletingRunner( MainJobParameterization parameterization, TokenCredential credential, RowItemGateway rowItemGateway, DbClientFactory dbClientFactory, IStagingBlobUriProvider stagingBlobUriProvider) : base( parameterization, credential, rowItemGateway, dbClientFactory, stagingBlobUriProvider, TimeSpan.FromSeconds(5)) { } public async Task RunAsync(CancellationToken ct) { while (!AllActivitiesCompleted()) { await CompleteIterationsAsync(ct); CompleteActivities(); // Sleep await SleepAsync(ct); } } private async Task CompleteIterationsAsync(CancellationToken ct) { var completingIterations = RowItemGateway.InMemoryCache .ActivityMap .Values .Where(a => a.RowItem.State != ActivityState.Completed) .SelectMany(a => a.IterationMap.Values) // The iteration is planned, hence all its blocks are in place .Where(i => i.RowItem.State == IterationState.Planned) // All blocks in the iteration are moved .Where(i => !i.BlockMap.Any() || !i.BlockMap.Values.Any(b => b.RowItem.State != BlockState.ExtentMoved)); foreach (var iteration in completingIterations) { if (iteration.TempTable != null && iteration.TempTable.State == TempTableState.Created) { var tableId = RowItemGateway.InMemoryCache .ActivityMap[iteration.RowItem.ActivityName] .RowItem .DestinationTable; var dbClient = DbClientFactory.GetDbCommandClient( tableId.ClusterUri, tableId.DatabaseName); var iterationKey = iteration.RowItem.GetIterationKey(); await dbClient.DropTableIfExistsAsync( new KustoPriority(iterationKey), iteration.TempTable.TempTableName, ct); await StagingBlobUriProvider.DeleteStagingDirectoryAsync(iterationKey, ct); } var newIteration = iteration.RowItem.ChangeState(IterationState.Completed); RowItemGateway.Append(newIteration); } } private void CompleteActivities() { var candidateActivities = RowItemGateway.InMemoryCache .ActivityMap .Values .Where(a => a.RowItem.State != ActivityState.Completed) // There is at least one iteration: exclude iteration-less activities .Where(a => a.IterationMap.Any()) // All iterations are completed .Where(a => !a.IterationMap.Values.Any(i => i.RowItem.State != IterationState.Completed)) .Select(a => a.RowItem); foreach (var activity in candidateActivities) { if (!Parameterization.IsContinuousRun || Parameterization.Activities[activity.ActivityName].TableOption.ExportMode == ExportMode.BackfillOnly) { var newActivity = activity.ChangeState(ActivityState.Completed); RowItemGateway.Append(newActivity); } } } } }