in lib/logstash/outputs/kusto.rb [123:173]
def register
require 'fileutils'
@files = {}
@io_mutex = Mutex.new
final_mapping = json_mapping
if final_mapping.nil? || final_mapping.empty?
final_mapping = mapping
end
@path = if dynamic_event_routing
File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}")
else
File.expand_path("#{path}.#{database}.#{table}")
end
validate_path
@file_root = if path_with_field_ref?
extract_file_root
else
File.dirname(path)
end
@failure_path = File.join(@file_root, @filename_failure)
executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
recover_past_files if recovery
@last_stale_cleanup_cycle = Time.now
@flush_interval = @flush_interval.to_i
if @flush_interval > 0
@flusher = Interval.start(@flush_interval, -> { flush_pending_files })
end
if (@stale_cleanup_type == 'interval') && (@stale_cleanup_interval > 0)
@cleaner = Interval.start(stale_cleanup_interval, -> { close_stale_files })
end
end