code/KustoCopyConsole/Runner/ExportingRunner.cs (271 lines of code) (raw):
using Azure.Core;
using Kusto.Cloud.Platform.Utils;
using KustoCopyConsole.Entity.InMemory;
using KustoCopyConsole.Entity.RowItems;
using KustoCopyConsole.Entity.RowItems.Keys;
using KustoCopyConsole.Entity.State;
using KustoCopyConsole.JobParameter;
using KustoCopyConsole.Kusto;
using KustoCopyConsole.Storage;
using System;
using System.Collections.Immutable;
using System.Linq;
namespace KustoCopyConsole.Runner
{
internal class ExportingRunner : RunnerBase
{
#region Inner Types
private record CapacityCache(DateTime CachedTime, int CachedCapacity);
#endregion
private static readonly TimeSpan CAPACITY_REFRESH_PERIOD = TimeSpan.FromMinutes(5);
public ExportingRunner(
MainJobParameterization parameterization,
TokenCredential credential,
RowItemGateway rowItemGateway,
DbClientFactory dbClientFactory,
IStagingBlobUriProvider stagingBlobUriProvider)
: base(
parameterization,
credential,
rowItemGateway,
dbClientFactory,
stagingBlobUriProvider,
TimeSpan.FromSeconds(5))
{
}
public async Task RunAsync(CancellationToken ct)
{
var capacityMap = new Dictionary<Uri, CapacityCache>();
while (!AllActivitiesCompleted())
{
var candidatesByCluster = GetCandidatesByCluster();
// Ensure (or fetch) capacity of each cluster having candidates
await EnsureCapacityCacheAsync(capacityMap, candidatesByCluster.Keys, ct);
var processTasks = candidatesByCluster
.Select(p => Task.Run(() => ProcessExportByClusterAsync(
p.Key,
capacityMap[p.Key].CachedCapacity,
p.Value,
ct)))
.ToImmutableArray();
await Task.WhenAll(processTasks);
await SleepAsync(ct);
}
}
private IImmutableDictionary<Uri, IImmutableDictionary<IterationKey, IImmutableList<BlockRowItem>>>
GetCandidatesByCluster()
{
IImmutableDictionary<IterationKey, IImmutableList<BlockRowItem>> GroupByIteration(
IEnumerable<ActivityFlatHierarchy> flatHierarchy)
{
var map = flatHierarchy
.GroupBy(
h => new IterationKey(h.Activity.ActivityName, h.Iteration.IterationId),
h => h.Block)
.ToImmutableDictionary(
g => g.Key,
g => (IImmutableList<BlockRowItem>)g.ToImmutableArray());
return map;
}
var flatHierarchy = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy(
a => a.RowItem.State != ActivityState.Completed,
i => i.RowItem.State != IterationState.Completed);
var plannedOnly = flatHierarchy
.Where(h => h.Block.State == BlockState.Planned);
var candidates = plannedOnly
.GroupBy(h => h.Activity.SourceTable.ClusterUri)
.ToImmutableDictionary(
g => g.Key,
g => GroupByIteration(g));
return candidates;
}
private async Task EnsureCapacityCacheAsync(
IDictionary<Uri, CapacityCache> capacityMap,
IEnumerable<Uri> clusterUris,
CancellationToken ct)
{
var clustersToUpdate = clusterUris
.Where(u => !capacityMap.ContainsKey(u)
|| capacityMap[u].CachedTime + CAPACITY_REFRESH_PERIOD < DateTime.Now);
var capacityUpdateTasks = clustersToUpdate
.Select(u => new
{
ClusterUri = u,
CapacityTask = FetchCapacityAsync(u, ct)
})
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(capacityUpdateTasks.Select(o => o.CapacityTask));
foreach (var update in capacityUpdateTasks)
{
capacityMap[update.ClusterUri] =
new CapacityCache(DateTime.Now, update.CapacityTask.Result);
}
}
private async Task<int> ProcessExportByClusterAsync(
Uri clusterUri,
int capacity,
IImmutableDictionary<IterationKey, IImmutableList<BlockRowItem>> iterationMap,
CancellationToken ct)
{
var exportingCount = GetExportingCount(clusterUri);
var freeCapacity = capacity - exportingCount;
var orderedIterationKeys = iterationMap.Keys
.OrderBy(k => k.ActivityName)
.ThenBy(k => k.IterationId);
var lineup = new List<BlockRowItem>();
// Get line up by iteration priority
foreach (var iterationKey in orderedIterationKeys)
{
if (freeCapacity > 0)
{
var candidates = iterationMap[iterationKey];
freeCapacity -= lineup.Count();
lineup.AddRange(GetLineup(iterationKey, candidates, freeCapacity));
}
}
var startExportTasks = lineup
.Select(b => Task.Run(() => StartExportAsync(b, ct)))
.ToImmutableArray();
await Task.WhenAll(startExportTasks);
return lineup.Count;
}
private int GetExportingCount(Uri clusterUri)
{
var flatHierarchy = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy(
a => a.RowItem.SourceTable.ClusterUri == clusterUri,
i => i.RowItem.State != IterationState.Completed);
var exportCount = flatHierarchy
.Where(h => h.Block.State == BlockState.Exporting)
.Count();
return exportCount;
}
private IEnumerable<BlockRowItem> GetLineup(
IterationKey iterationKey,
IEnumerable<BlockRowItem> candidates,
int freeCapacity)
{
const int MIN_STAT_COUNT = 5;
const long MIN_ROW_COUNT_STATS = 100000;
const long MAX_ROW_COUNT = 16000000;
const int LEAP_RATIO = 3;
var MAX_EXPORT_DURATION = TimeSpan.FromMinutes(4);
var latestBlocks = RowItemGateway.InMemoryCache
.ActivityMap[iterationKey.ActivityName]
.IterationMap[iterationKey.IterationId]
.BlockMap
.Values
.Select(c => c.RowItem)
.Where(b => b.State >= BlockState.Exported)
// We want representative export, i.e. meaningful size
.Where(b => b.ExportedRowCount > MIN_ROW_COUNT_STATS)
.OrderByDescending(b => b.ExportedRowCount)
.Take(MIN_STAT_COUNT)
.ToImmutableArray();
if (latestBlocks.Length == MIN_STAT_COUNT)
{ // Replan blocks
var totalDuration = latestBlocks.Sum(b => b.ExportDuration!.Value.TotalSeconds);
var totalRows = latestBlocks.Sum(b => b.ExportedRowCount);
var maxRowCount = latestBlocks.Max(b => b.ExportedRowCount);
var averageDurationPerRow = TimeSpan.FromSeconds(totalDuration / totalRows);
var targetRowCount = Math.Max(
1,
Math.Min(
Math.Min(MAX_ROW_COUNT, LEAP_RATIO * maxRowCount),
MAX_EXPORT_DURATION / averageDurationPerRow));
return GetReplannedLineup(candidates, targetRowCount, freeCapacity);
}
else
{ // Just return the top blocks
return candidates
.OrderBy(b => b.IngestionTimeStart)
.Take(freeCapacity)
.ToImmutableArray();
}
}
private IEnumerable<BlockRowItem> GetReplannedLineup(
IEnumerable<BlockRowItem> candidates,
double targetRowCount,
int freeCapacity)
{
// Stack them upside down
var candidateStack =
new Stack<BlockRowItem>(candidates.OrderByDescending(c => c.IngestionTimeStart));
var lineup = new List<BlockRowItem>(freeCapacity);
while (freeCapacity - lineup.Count > 0 && candidateStack.Any())
{
var first = candidateStack.Pop();
if (candidateStack.Any())
{
var second = candidateStack.Pop();
var merge = new BlockRowItem
{
State = first.State,
ActivityName = first.ActivityName,
IterationId = first.IterationId,
BlockId = first.BlockId,
IngestionTimeStart = first.IngestionTimeStart,
IngestionTimeEnd = second.IngestionTimeEnd,
MinCreationTime = first.MinCreationTime < second.MinCreationTime
? first.MinCreationTime
: second.MinCreationTime,
MaxCreationTime = first.MaxCreationTime > second.MaxCreationTime
? first.MaxCreationTime
: second.MaxCreationTime,
PlannedRowCount = first.PlannedRowCount + second.PlannedRowCount,
ReplannedBlockIds = first.ReplannedBlockIds
.Concat(second.ReplannedBlockIds)
.Append(second.BlockId)
.ToImmutableArray(),
};
if (merge.PlannedRowCount <= targetRowCount
&& (merge.MaxCreationTime - merge.MinCreationTime < TimeSpan.FromDays(1)))
{
candidateStack.Push(merge);
}
else
{
lineup.Add(first);
candidateStack.Push(second);
}
}
else
{
lineup.Add(first);
}
}
return lineup;
}
private async Task StartExportAsync(BlockRowItem item, CancellationToken ct)
{
var activity = RowItemGateway.InMemoryCache.ActivityMap[item.ActivityName].RowItem;
var iteration = RowItemGateway.InMemoryCache
.ActivityMap[item.ActivityName]
.IterationMap[item.IterationId]
.RowItem;
var dbClient = DbClientFactory.GetDbCommandClient(
activity.SourceTable.ClusterUri,
activity.SourceTable.DatabaseName);
var writableUris = await StagingBlobUriProvider.GetWritableFolderUrisAsync(
item.GetBlockKey(),
ct);
var query = Parameterization.Activities[item.ActivityName].KqlQuery;
var operationId = await dbClient.ExportBlockAsync(
new KustoPriority(item.GetBlockKey()),
writableUris,
activity.SourceTable.TableName,
query,
iteration.CursorStart,
iteration.CursorEnd,
item.IngestionTimeStart,
item.IngestionTimeEnd,
ct);
var newBlockItem = item.ChangeState(BlockState.Exporting);
newBlockItem.ExportOperationId = operationId;
RowItemGateway.Append(newBlockItem);
}
private async Task<int> FetchCapacityAsync(Uri clusterUri, CancellationToken ct)
{
var dbCommandClient = DbClientFactory.GetDbCommandClient(clusterUri, string.Empty);
var capacity = await dbCommandClient.ShowExportCapacityAsync(
KustoPriority.HighestPriority,
ct);
return capacity;
}
}
}