public void createKustoIngestClient()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [283:304]


    public void createKustoIngestClient(KustoSinkConfig config) {
        try {
            HttpClientProperties httpClientProperties = null;
            if (StringUtils.isNotEmpty(config.getConnectionProxyHost()) && config.getConnectionProxyPort() > -1) {
                httpClientProperties = HttpClientProperties.builder()
                        .proxy(new HttpHost(config.getConnectionProxyHost(), config.getConnectionProxyPort())).build();
            }
            ConnectionStringBuilder ingestConnectionStringBuilder = createKustoEngineConnectionString(config, config.getKustoIngestUrl());
            kustoIngestClient = httpClientProperties != null ? IngestClientFactory.createClient(ingestConnectionStringBuilder, httpClientProperties)
                    : IngestClientFactory.createClient(ingestConnectionStringBuilder);

            if (isStreamingEnabled(config)) {
                ConnectionStringBuilder streamingConnectionStringBuilder = createKustoEngineConnectionString(config, config.getKustoEngineUrl());
                streamingIngestClient = httpClientProperties != null
                        ? IngestClientFactory.createManagedStreamingIngestClient(ingestConnectionStringBuilder, streamingConnectionStringBuilder,
                                httpClientProperties)
                        : IngestClientFactory.createManagedStreamingIngestClient(ingestConnectionStringBuilder, streamingConnectionStringBuilder);
            }
        } catch (Exception e) {
            throw new ConnectException("Failed to initialize KustoIngestClient", e);
        }
    }