code/KustoCopyConsole/Kusto/ProviderFactory.cs (113 lines of code) (raw):
using Azure.Core;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using KustoCopyConsole.JobParameter;
using System.Collections.Immutable;
namespace KustoCopyConsole.Kusto
{
internal class ProviderFactory : IDisposable
{
private const string APPLICATION_NAME = "KustoCopy";
private readonly ImmutableDictionary<Uri, ICslQueryProvider> _queryProviderMap;
private readonly ImmutableDictionary<Uri, ICslAdminProvider> _commandProviderMap;
private readonly ImmutableDictionary<Uri, ICslAdminProvider> _dmCommandProviderMap;
private readonly ImmutableDictionary<Uri, IKustoQueuedIngestClient> _ingestProviderMap;
#region Constructor
public ProviderFactory(
MainJobParameterization parameterization,
TokenCredential credentials,
string traceApplicationName)
{
var sourceClusterUris = parameterization.Activities
.Values
.Select(a => NormalizedUri.NormalizeUri(a.Source.ClusterUri))
.Distinct();
var destinationClusterUris = parameterization.Activities
.Values
.Select(a => NormalizedUri.NormalizeUri(a.Destination.ClusterUri))
.Distinct();
var allClusterUris = sourceClusterUris
.Concat(destinationClusterUris)
.Distinct();
var sourceBuilders = sourceClusterUris
.Select(uri => new
{
Uri = uri,
Builder = CreateBuilder(credentials, uri, traceApplicationName)
});
var destinationIngestionBuilders = destinationClusterUris
.Select(uri => new
{
Uri = uri,
Builder = CreateBuilder(credentials, GetIngestUri(uri), traceApplicationName)
});
var allBuilders = allClusterUris
.Select(uri => new
{
Uri = uri,
Builder = CreateBuilder(credentials, uri, traceApplicationName)
});
_queryProviderMap = allBuilders
.ToImmutableDictionary(
e => e.Uri,
e => KustoClientFactory.CreateCslQueryProvider(e.Builder));
_commandProviderMap = allBuilders
.ToImmutableDictionary(
e => e.Uri,
e => KustoClientFactory.CreateCslAdminProvider(e.Builder));
_dmCommandProviderMap = destinationIngestionBuilders
.ToImmutableDictionary(
e => e.Uri,
e => KustoClientFactory.CreateCslAdminProvider(e.Builder));
_ingestProviderMap = destinationIngestionBuilders
.ToImmutableDictionary(
e => e.Uri,
e => KustoIngestFactory.CreateQueuedIngestClient(e.Builder));
}
private static KustoConnectionStringBuilder CreateBuilder(
TokenCredential credentials,
Uri uri,
string traceApplicationName)
{
var builder = new KustoConnectionStringBuilder(uri.ToString())
.WithAadAzureTokenCredentialsAuthentication(credentials);
builder.ApplicationNameForTracing = traceApplicationName;
return builder;
}
#endregion
void IDisposable.Dispose()
{
var disposables = _commandProviderMap.Values.Cast<IDisposable>()
.Concat(_queryProviderMap.Values)
.Concat(_dmCommandProviderMap.Values);
foreach (var disposable in disposables)
{
disposable.Dispose();
}
}
public ICslQueryProvider GetQueryProvider(Uri clusterUri)
{
return _queryProviderMap[clusterUri];
}
public ICslAdminProvider GetCommandProvider(Uri clusterUri)
{
return _commandProviderMap[clusterUri];
}
public ICslAdminProvider GetDmCommandProvider(Uri clusterUri)
{
return _dmCommandProviderMap[clusterUri];
}
public IKustoQueuedIngestClient GetIngestProvider(Uri clusterUri)
{
return _ingestProviderMap[clusterUri];
}
private static Uri GetIngestUri(Uri uri)
{
var builder = new UriBuilder(uri);
builder.Host = $"ingest-{uri.Host}";
return builder.Uri;
}
}
}