code/KustoCopyConsole/Runner/ProgressBar.cs (86 lines of code) (raw):

using KustoCopyConsole.Entity.RowItems.Keys; using KustoCopyConsole.Entity.State; using KustoCopyConsole.Storage; using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Text; using System.Threading.Tasks; namespace KustoCopyConsole.Runner { internal class ProgressBar : IAsyncDisposable { private static readonly TimeSpan WAKE_PERIOD = TimeSpan.FromSeconds(10); private readonly RowItemGateway _rowItemGateway; private readonly Task _backgroundTask; private readonly TaskCompletionSource _completionSource = new TaskCompletionSource(); public ProgressBar(RowItemGateway rowItemGateway, CancellationToken ct) { _rowItemGateway = rowItemGateway; _backgroundTask = Task.Run(() => BackgroundRunAsync(ct)); } async ValueTask IAsyncDisposable.DisposeAsync() { _completionSource.TrySetResult(); await _backgroundTask; } private async Task BackgroundRunAsync(CancellationToken ct) { var iterationBag = (IImmutableSet<IterationKey>)ImmutableHashSet<IterationKey>.Empty; while (!_completionSource.Task.IsCompleted) { iterationBag = ReportProgress(iterationBag); await Task.WhenAny( Task.Delay(WAKE_PERIOD, ct), _completionSource.Task); } } private IImmutableSet<IterationKey> ReportProgress( IImmutableSet<IterationKey> iterationBag) { var activeIterations = _rowItemGateway.InMemoryCache .ActivityMap .Values .Where(a => a.RowItem.State == ActivityState.Active) .SelectMany(a => a.IterationMap.Values) .Where(i => i.RowItem.State != IterationState.Completed) .Select(i => i.RowItem.GetIterationKey()) .ToImmutableHashSet(); foreach (var key in iterationBag.Except(activeIterations)) { Console.WriteLine( $"Completed ({key.ActivityName}, {key.IterationId})"); } foreach (var key in activeIterations) { ReportIterationProgress(key); } return activeIterations; } private void ReportIterationProgress(IterationKey key) { var iterationCache = _rowItemGateway.InMemoryCache .ActivityMap[key.ActivityName] .IterationMap[key.IterationId]; var blockMap = iterationCache.BlockMap; var blockItems = blockMap.Values.Select(b => b.RowItem); var plannedCount = blockItems.Count(b => b.State == BlockState.Planned); var exportingCount = blockItems.Count(b => b.State == BlockState.Exporting); var exportedCount = blockItems.Count(b => b.State == BlockState.Exported); var queuedCount = blockItems.Count(b => b.State == BlockState.Queued); var ingestedCount = blockItems.Count(b => b.State == BlockState.Ingested); var movedCount = blockItems.Count(b => b.State == BlockState.ExtentMoved); var plannedRowCount = blockItems.Sum(b => b.PlannedRowCount); var exportedRowCount = blockItems.Sum(b => b.ExportedRowCount); Console.WriteLine( $"Progress {key} [{iterationCache.RowItem.State}]: " + $"Total={blockMap.Count}, Planned={plannedCount}, " + $"Exporting={exportingCount}, Exported={exportedCount}, " + $"Queued={queuedCount}, Ingested={ingestedCount}, " + $"Moved={movedCount} " + $"({exportedRowCount:N0} / {plannedRowCount:N0})"); } } }