code/KustoCopyConsole/Runner/MainRunner.cs (184 lines of code) (raw):

using Azure.Core; using KustoCopyConsole.Entity.InMemory; using KustoCopyConsole.Entity.RowItems; using KustoCopyConsole.Entity.RowItems.Keys; using KustoCopyConsole.Entity.State; using KustoCopyConsole.JobParameter; using KustoCopyConsole.Kusto; using KustoCopyConsole.Storage; using KustoCopyConsole.Storage.AzureStorage; namespace KustoCopyConsole.Runner { internal class MainRunner : RunnerBase, IAsyncDisposable { #region Constructors internal static async Task<MainRunner> CreateAsync( Version appVersion, MainJobParameterization parameterization, string traceApplicationName, CancellationToken ct) { var credentials = parameterization.CreateCredentials(); var fileSystem = new AzureBlobFileSystem( parameterization.StagingStorageDirectories.First(), credentials); Console.Write("Initialize storage..."); var logStorage = await LogStorage.CreateAsync(fileSystem, appVersion, ct); Console.WriteLine(" Done"); Console.Write("Reading checkpoint logs..."); var rowItemGateway = await RowItemGateway.CreateAsync(logStorage, ct); Console.WriteLine(" Done"); Console.Write("Initialize Kusto connections..."); var dbClientFactory = await DbClientFactory.CreateAsync( parameterization, credentials, traceApplicationName, ct); Console.WriteLine(" Done"); var stagingBlobUriProvider = new AzureBlobUriProvider( parameterization.StagingStorageDirectories.Select(s => new Uri(s)), credentials); return new MainRunner( parameterization, credentials, rowItemGateway, dbClientFactory, stagingBlobUriProvider); } private MainRunner( MainJobParameterization parameterization, TokenCredential credential, RowItemGateway rowItemGateway, DbClientFactory dbClientFactory, IStagingBlobUriProvider stagingBlobUriProvider) : base( parameterization, credential, rowItemGateway, dbClientFactory, stagingBlobUriProvider, TimeSpan.Zero) { } #endregion async ValueTask IAsyncDisposable.DisposeAsync() { await ((IAsyncDisposable)RowItemGateway).DisposeAsync(); ((IDisposable)DbClientFactory).Dispose(); } public async Task RunAsync(CancellationToken ct) { DisplayExistingIterations(); await using (var progressBar = new ProgressBar(RowItemGateway, ct)) { foreach (var a in Parameterization.Activities.Values) { EnsureActivity(a); EnsureIteration(a); } var iterationRunner = new PlanningRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); var tempTableRunner = new TempTableCreatingRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); var exportingRunner = new ExportingRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); var awaitExportedRunner = new AwaitExportedRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); var queueIngestRunner = new QueueIngestRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); var awaitIngestRunner = new AwaitIngestRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); var iterationCompletingRunner = new IterationCompletingRunner( Parameterization, Credential, RowItemGateway, DbClientFactory, StagingBlobUriProvider); await TaskHelper.WhenAllWithErrors( Task.Run(() => iterationRunner.RunAsync(ct)), Task.Run(() => tempTableRunner.RunAsync(ct)), Task.Run(() => exportingRunner.RunAsync(ct)), Task.Run(() => awaitExportedRunner.RunAsync(ct)), Task.Run(() => queueIngestRunner.RunAsync(ct)), Task.Run(() => awaitIngestRunner.RunAsync(ct)), Task.Run(() => iterationCompletingRunner.RunAsync(ct))); } } private static void DisplayIteration(IterationRowItem item, bool isNew) { var iterationAge = isNew ? "New" : "Existing"; Console.WriteLine( $"{iterationAge} iteration {item.GetIterationKey()}: " + $"['{item.CursorStart}', '{item.CursorEnd}']"); } private void DisplayExistingIterations() { var cache = RowItemGateway.InMemoryCache; var existingIterations = cache.ActivityMap .Values .SelectMany(a => a.IterationMap.Values) .Select(i => i.RowItem) .Where(i => i.State != IterationState.Completed); foreach (var iteration in existingIterations) { DisplayIteration(iteration, false); } } private void EnsureIteration(ActivityParameterization activityParam) { if (activityParam.TableOption.ExportMode != ExportMode.BackfillOnly) { throw new NotSupportedException( $"'{activityParam.TableOption.ExportMode}' isn't supported yet"); } var cache = RowItemGateway.InMemoryCache; var cachedIterations = cache.ActivityMap.ContainsKey(activityParam.ActivityName) ? cache.ActivityMap[activityParam.ActivityName].IterationMap.Values : Array.Empty<IterationCache>(); var completedIterations = cachedIterations .Select(c => c.RowItem) .Where(i => i.State == IterationState.Completed); var activeIterations = cachedIterations .Select(c => c.RowItem) .Where(i => i.State != IterationState.Completed); var isBackfillOnly = activityParam.TableOption.ExportMode == ExportMode.BackfillOnly; // Start new iteration if need to if (!cachedIterations.Any()) { var lastIteration = cachedIterations.Any() ? cachedIterations.ArgMax(i => i.RowItem.IterationId).RowItem : null; var newIterationId = lastIteration != null ? lastIteration.IterationId + 1 : 1; var cursorStart = lastIteration != null ? lastIteration.CursorEnd : string.Empty; var newIterationItem = new IterationRowItem { State = IterationState.Starting, ActivityName = activityParam.ActivityName, IterationId = newIterationId, CursorStart = cursorStart, CursorEnd = string.Empty }; var iterationKey = newIterationItem.GetIterationKey(); RowItemGateway.Append(newIterationItem); DisplayIteration(newIterationItem, true); } } private void EnsureActivity(ActivityParameterization activityParam) { if (!RowItemGateway.InMemoryCache.ActivityMap.ContainsKey(activityParam.ActivityName)) { var activity = new ActivityRowItem { State = ActivityState.Active, ActivityName = activityParam.ActivityName, SourceTable = activityParam.Source.GetTableIdentity(), DestinationTable = activityParam.GetEffectiveDestinationTableIdentity() }; RowItemGateway.Append(activity); Console.WriteLine($"New activity: '{activity.ActivityName}'"); } } } }