in lib/logstash/outputs/kusto/ingestor.rb [22:89]
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
@logger.info('Preparing Kusto resources.')
kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
kusto_connection_string = if is_managed_identity
if is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
end
else
if cli_auth
@logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
else
@logger.info('Using app id and app key.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end
end
@logger.debug(Gem.loaded_specs.to_s)
name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}"
@logger.debug("Client name for tracing: #{name_for_tracing}")
tuple_utils = Java::org.apache.commons.lang3.tuple
version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());
@kusto_client = begin
if is_direct_conn
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
end
end
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
if is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end