code/KustoCopyConsole/Runner/QueueIngestRunner.cs (128 lines of code) (raw):

using Azure.Core; using KustoCopyConsole.Entity.InMemory; using KustoCopyConsole.Entity.RowItems; using KustoCopyConsole.Entity.State; using KustoCopyConsole.JobParameter; using KustoCopyConsole.Kusto; using KustoCopyConsole.Storage; using System.Collections.Immutable; using System.Diagnostics; using System.Linq; namespace KustoCopyConsole.Runner { internal class QueueIngestRunner : RunnerBase { public QueueIngestRunner( MainJobParameterization parameterization, TokenCredential credential, RowItemGateway rowItemGateway, DbClientFactory dbClientFactory, IStagingBlobUriProvider stagingBlobUriProvider) : base( parameterization, credential, rowItemGateway, dbClientFactory, stagingBlobUriProvider, TimeSpan.FromSeconds(5)) { } public async Task RunAsync(CancellationToken ct) { // Clean half-queued URLs CleanQueuingUrls(); while (!AllActivitiesCompleted()) { var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy( a => a.RowItem.State != ActivityState.Completed, i => i.RowItem.State != IterationState.Completed); var exportedBlocks = allBlocks .Where(h => h.Block.State == BlockState.Exported); var ingestionTasks = exportedBlocks .OrderBy(h => h.Activity.ActivityName) .ThenBy(h => h.Block.IterationId) .ThenBy(h => h.Block.BlockId) .Select(h => QueueIngestBlockAsync(h, ct)) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(ingestionTasks); if (!ingestionTasks.Any()) { // Sleep await SleepAsync(ct); } } } private void CleanQueuingUrls() { var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy( a => a.RowItem.State != ActivityState.Completed, i => i.RowItem.State != IterationState.Completed); var queuingBlocks = allBlocks .Where(h => h.Block.State == BlockState.Exported) .Where(h => h.Urls.Any(u => u.State == UrlState.Queued)); var queuingUrls = queuingBlocks .SelectMany(h => h.Urls); foreach (var block in queuingBlocks) { foreach (var url in block.Urls.Where(u => u.State == UrlState.Queued)) { var newUrlItem = url.ChangeState(UrlState.Exported); RowItemGateway.Append(newUrlItem); } } } private async Task QueueIngestBlockAsync(ActivityFlatHierarchy item, CancellationToken ct) { UrlRowItem MarkUrlAsQueued(UrlRowItem url, string serializedQueueResult) { var newUrl = url.ChangeState(UrlState.Queued); newUrl.SerializedQueuedResult = serializedQueueResult; return newUrl; } // It's possible, although unlikely, the temp table hasn't been created yet // If so, we'll process this block later if (item.TempTable != null) { var ingestClient = DbClientFactory.GetIngestClient( item.Activity.DestinationTable.ClusterUri, item.Activity.DestinationTable.DatabaseName, item.TempTable!.TempTableName); var blockTag = $"drop-by:kusto-copy|{Guid.NewGuid()}"; var newBlockItem = item.Block.ChangeState(BlockState.Queued); newBlockItem.BlockTag = blockTag; Trace.TraceInformation($"Block {item.Block.GetBlockKey()}: ingest " + $"{item.Urls.Count()} urls"); var queuingTasks = item .Urls .Select(u => new { Url = u, Task = QueueIngestUrlAsync( ingestClient, newBlockItem, new Uri(u.Url), ct) }) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(queuingTasks.Select(o => o.Task)); var newUrlItems = queuingTasks .Select(o => MarkUrlAsQueued(o.Url, o.Task.Result)); RowItemGateway.Append(newUrlItems); RowItemGateway.Append(newBlockItem); Trace.TraceInformation($"Block {item.Block.GetBlockKey()}: " + $"{item.Urls.Count()} urls queued"); } } private async Task<string> QueueIngestUrlAsync( IngestClient ingestClient, BlockRowItem block, Uri blobUrl, CancellationToken ct) { var uri = await StagingBlobUriProvider.AuthorizeUriAsync(blobUrl, ct); var serializedQueueResult = await ingestClient.QueueBlobAsync( new KustoPriority(block.GetBlockKey()), uri, block.BlockTag, block.MinCreationTime, ct); return serializedQueueResult; } } }