in code/KustoCopyConsole/Kusto/DbClientFactory.cs [23:81]
public static async Task<DbClientFactory> CreateAsync(
MainJobParameterization parameterization,
TokenCredential credentials,
string traceApplicationName,
CancellationToken ct)
{
var providerFactory =
new ProviderFactory(parameterization, credentials, traceApplicationName);
var sourceClusterUris = parameterization.Activities
.Values
.Select(a => NormalizedUri.NormalizeUri(a.Source.ClusterUri))
.Distinct();
var destinationClusterUris = parameterization.Activities
.Values
.Select(a => NormalizedUri.NormalizeUri(a.Destination.ClusterUri))
.Distinct();
var allClusterUris = sourceClusterUris
.Concat(destinationClusterUris)
.Distinct();
var queryCapacityTasks = allClusterUris
.Select(uri => new
{
Task = GetQueryCapacityAsync(providerFactory.GetCommandProvider(uri), ct),
Uri = uri
})
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(queryCapacityTasks.Select(o => o.Task));
var allClusterQueryCount = queryCapacityTasks
.Select(o => new
{
o.Uri,
ConcurrentQueryCount = (int)Math.Max(1, 0.1 * o.Task.Result)
});
var allClusterQueryQueueMap = allClusterQueryCount
.ToImmutableDictionary(
o => o.Uri,
o => new PriorityExecutionQueue<KustoPriority>(o.ConcurrentQueryCount));
var allClusterCommandQueueMap = allClusterQueryCount
.ToImmutableDictionary(
o => o.Uri,
o => new PriorityExecutionQueue<KustoPriority>(o.ConcurrentQueryCount));
var destinationClusterDmQueryQueueMap = destinationClusterUris
.ToImmutableDictionary(
u => u,
u => new PriorityExecutionQueue<KustoPriority>(MAX_CONCURRENT_DM_COMMAND));
var destinationClusterIngestCommandQueueMap = destinationClusterUris
.ToImmutableDictionary(
u => u,
u => new PriorityExecutionQueue<KustoPriority>(MAX_CONCURRENT_INGEST_QUEUING));
return new DbClientFactory(
providerFactory,
allClusterQueryQueueMap,
allClusterCommandQueueMap,
destinationClusterDmQueryQueueMap,
destinationClusterIngestCommandQueueMap);
}