in logstash-core/lib/logstash/java_pipeline.rb [252:338]
def start_workers
@worker_threads.clear
@outputs_registered.make_false
begin
maybe_setup_out_plugins
pipeline_workers = safe_pipeline_worker_count
@preserve_event_order = preserve_event_order?(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")
max_inflight = batch_size * pipeline_workers
config_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :config])
config_metric.gauge(:workers, pipeline_workers)
config_metric.gauge(:batch_size, batch_size)
config_metric.gauge(:batch_delay, batch_delay)
config_metric.gauge(:config_reload_automatic, settings.get("config.reload.automatic"))
config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval").to_nanos)
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
config_metric.gauge(:ephemeral_id, ephemeral_id)
config_metric.gauge(:hash, lir.unique_hash)
config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))
pipeline_log_params = default_logging_keys(
"pipeline.workers" => pipeline_workers,
"pipeline.batch.size" => batch_size,
"pipeline.batch.delay" => batch_delay,
"pipeline.max_inflight" => max_inflight,
"pipeline.sources" => pipeline_source_details)
@logger.info("Starting pipeline", pipeline_log_params)
filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
workers_init_start = Time.now
worker_loops = pipeline_workers.times
.map { Thread.new { init_worker_loop } }
.map(&:value)
workers_init_elapsed = Time.now - workers_init_start
fail("Some worker(s) were not correctly initialized") if worker_loops.any? {|v| v.nil?}
@logger.info("Pipeline Java execution initialization time", "seconds" => workers_init_elapsed.round(2))
worker_loops.each_with_index do |worker_loop, t|
thread = WorkerLoopThread.new(worker_loop) do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
begin
worker_loop.run
rescue => e
@crash_detected.make_true
@logger.error(
"Pipeline worker error, the pipeline will be stopped",
default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace)
)
end
end
@worker_threads << thread
end
begin
start_inputs
rescue => e
@crash_detected.make_true
shutdown_workers
raise e
end
ensure
@ready.make_true
end
end