in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [283:304]
public void createKustoIngestClient(KustoSinkConfig config) {
try {
HttpClientProperties httpClientProperties = null;
if (StringUtils.isNotEmpty(config.getConnectionProxyHost()) && config.getConnectionProxyPort() > -1) {
httpClientProperties = HttpClientProperties.builder()
.proxy(new HttpHost(config.getConnectionProxyHost(), config.getConnectionProxyPort())).build();
}
ConnectionStringBuilder ingestConnectionStringBuilder = createKustoEngineConnectionString(config, config.getKustoIngestUrl());
kustoIngestClient = httpClientProperties != null ? IngestClientFactory.createClient(ingestConnectionStringBuilder, httpClientProperties)
: IngestClientFactory.createClient(ingestConnectionStringBuilder);
if (isStreamingEnabled(config)) {
ConnectionStringBuilder streamingConnectionStringBuilder = createKustoEngineConnectionString(config, config.getKustoEngineUrl());
streamingIngestClient = httpClientProperties != null
? IngestClientFactory.createManagedStreamingIngestClient(ingestConnectionStringBuilder, streamingConnectionStringBuilder,
httpClientProperties)
: IngestClientFactory.createManagedStreamingIngestClient(ingestConnectionStringBuilder, streamingConnectionStringBuilder);
}
} catch (Exception e) {
throw new ConnectException("Failed to initialize KustoIngestClient", e);
}
}