code/KustoCopyConsole/Kusto/DbClientFactory.cs (159 lines of code) (raw):

using Azure.Core; using Kusto.Cloud.Platform.Data; using Kusto.Data.Common; using KustoCopyConsole.Concurrency; using KustoCopyConsole.JobParameter; using System.Collections.Immutable; using System.Data; namespace KustoCopyConsole.Kusto { internal class DbClientFactory : IDisposable { private const int MAX_CONCURRENT_DM_COMMAND = 2; private const int MAX_CONCURRENT_INGEST_QUEUING = 25; private readonly ProviderFactory _providerFactory; private readonly IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> _allClusterQueryQueueMap; private readonly IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> _allClusterCommandQueueMap; private readonly IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> _destinationClusterDmCommandQueueMap; private readonly IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> _destinationClusterIngestCommandQueueMap; #region Constructor public static async Task<DbClientFactory> CreateAsync( MainJobParameterization parameterization, TokenCredential credentials, string traceApplicationName, CancellationToken ct) { var providerFactory = new ProviderFactory(parameterization, credentials, traceApplicationName); var sourceClusterUris = parameterization.Activities .Values .Select(a => NormalizedUri.NormalizeUri(a.Source.ClusterUri)) .Distinct(); var destinationClusterUris = parameterization.Activities .Values .Select(a => NormalizedUri.NormalizeUri(a.Destination.ClusterUri)) .Distinct(); var allClusterUris = sourceClusterUris .Concat(destinationClusterUris) .Distinct(); var queryCapacityTasks = allClusterUris .Select(uri => new { Task = GetQueryCapacityAsync(providerFactory.GetCommandProvider(uri), ct), Uri = uri }) .ToImmutableArray(); await TaskHelper.WhenAllWithErrors(queryCapacityTasks.Select(o => o.Task)); var allClusterQueryCount = queryCapacityTasks .Select(o => new { o.Uri, ConcurrentQueryCount = (int)Math.Max(1, 0.1 * o.Task.Result) }); var allClusterQueryQueueMap = allClusterQueryCount .ToImmutableDictionary( o => o.Uri, o => new PriorityExecutionQueue<KustoPriority>(o.ConcurrentQueryCount)); var allClusterCommandQueueMap = allClusterQueryCount .ToImmutableDictionary( o => o.Uri, o => new PriorityExecutionQueue<KustoPriority>(o.ConcurrentQueryCount)); var destinationClusterDmQueryQueueMap = destinationClusterUris .ToImmutableDictionary( u => u, u => new PriorityExecutionQueue<KustoPriority>(MAX_CONCURRENT_DM_COMMAND)); var destinationClusterIngestCommandQueueMap = destinationClusterUris .ToImmutableDictionary( u => u, u => new PriorityExecutionQueue<KustoPriority>(MAX_CONCURRENT_INGEST_QUEUING)); return new DbClientFactory( providerFactory, allClusterQueryQueueMap, allClusterCommandQueueMap, destinationClusterDmQueryQueueMap, destinationClusterIngestCommandQueueMap); } private DbClientFactory( ProviderFactory providerFactory, IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> allClusterQueryQueueMap, IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> allClusterCommandQueueMap, IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> destinationClusterDmCommandQueueMap, IImmutableDictionary<Uri, PriorityExecutionQueue<KustoPriority>> destinationClusterIngestCommandQueueMap) { _providerFactory = providerFactory; _allClusterQueryQueueMap = allClusterQueryQueueMap; _allClusterCommandQueueMap = allClusterCommandQueueMap; _destinationClusterDmCommandQueueMap = destinationClusterDmCommandQueueMap; _destinationClusterIngestCommandQueueMap = destinationClusterIngestCommandQueueMap; } private static async Task<int> GetQueryCapacityAsync( ICslAdminProvider provider, CancellationToken ct) { var commandText = @" .show capacity | where Resource == 'Queries' | project Total"; var reader = await provider.ExecuteControlCommandAsync(string.Empty, commandText); var capacity = reader.ToDataSet().Tables[0].Rows .Cast<DataRow>() .Select(r => (long)r[0]) .First(); return (int)capacity; } #endregion void IDisposable.Dispose() { ((IDisposable)_providerFactory).Dispose(); } public DbQueryClient GetDbQueryClient(Uri clusterUri, string database) { try { var queue = _allClusterQueryQueueMap[clusterUri]; var provider = _providerFactory.GetQueryProvider(clusterUri); return new DbQueryClient(provider, queue, database); } catch (KeyNotFoundException ex) { throw new CopyException($"Can't find cluster '{clusterUri}'", false, ex); } } public DbCommandClient GetDbCommandClient(Uri clusterUri, string database) { try { var commandQueue = _allClusterCommandQueueMap[clusterUri]; var provider = _providerFactory.GetCommandProvider(clusterUri); return new DbCommandClient(provider, commandQueue, database); } catch (KeyNotFoundException ex) { throw new CopyException($"Can't find cluster '{clusterUri}'", false, ex); } } public DmCommandClient GetDmCommandClient(Uri clusterUri, string database) { try { var queue = _destinationClusterDmCommandQueueMap[clusterUri]; var provider = _providerFactory.GetDmCommandProvider(clusterUri); return new DmCommandClient(provider, queue, database); } catch (KeyNotFoundException ex) { throw new CopyException($"Can't find cluster '{clusterUri}'", false, ex); } } public IngestClient GetIngestClient(Uri clusterUri, string database, string table) { try { var queue = _destinationClusterIngestCommandQueueMap[clusterUri]; var ingestProvider = _providerFactory.GetIngestProvider(clusterUri); var ingestClient = new IngestClient(ingestProvider, queue, database, table); return ingestClient; } catch (KeyNotFoundException ex) { throw new CopyException($"Can't find cluster '{clusterUri}'", false, ex); } } } }