start_workers

in logstash-core/lib/logstash/java_pipeline.rb [252:338]


  def start_workers
    @worker_threads.clear 
    @outputs_registered.make_false
    begin
      maybe_setup_out_plugins

      pipeline_workers = safe_pipeline_worker_count
      @preserve_event_order = preserve_event_order?(pipeline_workers)
      batch_size = settings.get("pipeline.batch.size")
      batch_delay = settings.get("pipeline.batch.delay")

      max_inflight = batch_size * pipeline_workers

      config_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :config])
      config_metric.gauge(:workers, pipeline_workers)
      config_metric.gauge(:batch_size, batch_size)
      config_metric.gauge(:batch_delay, batch_delay)
      config_metric.gauge(:config_reload_automatic, settings.get("config.reload.automatic"))
      config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval").to_nanos)
      config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
      config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
      config_metric.gauge(:ephemeral_id, ephemeral_id)
      config_metric.gauge(:hash, lir.unique_hash)
      config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))

      pipeline_log_params = default_logging_keys(
        "pipeline.workers" => pipeline_workers,
        "pipeline.batch.size" => batch_size,
        "pipeline.batch.delay" => batch_delay,
        "pipeline.max_inflight" => max_inflight,
        "pipeline.sources" => pipeline_source_details)
      @logger.info("Starting pipeline", pipeline_log_params)

      filter_queue_client.set_batch_dimensions(batch_size, batch_delay)

      
      

      workers_init_start = Time.now
      worker_loops = pipeline_workers.times
        .map { Thread.new { init_worker_loop } }
        .map(&:value)
      workers_init_elapsed = Time.now - workers_init_start

      fail("Some worker(s) were not correctly initialized") if worker_loops.any? {|v| v.nil?}

      @logger.info("Pipeline Java execution initialization time", "seconds" => workers_init_elapsed.round(2))

      

      worker_loops.each_with_index do |worker_loop, t|
        thread = WorkerLoopThread.new(worker_loop) do
          Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
          ThreadContext.put("pipeline.id", pipeline_id)
          begin
            worker_loop.run
          rescue => e
            
            
            @crash_detected.make_true
            @logger.error(
              "Pipeline worker error, the pipeline will be stopped",
              default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace)
            )
          end
        end
        @worker_threads << thread
      end

      

      begin
        start_inputs
      rescue => e
        @crash_detected.make_true
        
        
        shutdown_workers
        raise e
      end
    ensure
      
      
      @ready.make_true
    end
  end