safe_bulk

in lib/logstash/plugin_mixins/opensearch/common.rb [221:278]


    def safe_bulk(actions)
      sleep_interval = @retry_initial_interval
      begin
        @client.bulk(actions) 
      rescue ::LogStash::Outputs::OpenSearch::HttpClient::Pool::HostUnreachableError => e
        
        
        @logger.error(
          "Attempted to send a bulk request but OpenSearch appears to be unreachable or down",
          message: e.message, exception: e.class, will_retry_in_seconds: sleep_interval
        )
        @logger.debug? && @logger.debug("Failed actions for last bad bulk request", :actions => actions)

        
        sleep_interval = sleep_for_interval(sleep_interval)
        @bulk_request_metrics.increment(:failures)
        retry unless @stopping.true?
      rescue ::LogStash::Outputs::OpenSearch::HttpClient::Pool::NoConnectionAvailableError => e
        @logger.error(
          "Attempted to send a bulk request but there are no living connections in the pool " +
          "(perhaps OpenSearch is unreachable or down?)",
          message: e.message, exception: e.class, will_retry_in_seconds: sleep_interval
        )

        sleep_interval = sleep_for_interval(sleep_interval)
        @bulk_request_metrics.increment(:failures)
        retry unless @stopping.true?
      rescue ::LogStash::Outputs::OpenSearch::HttpClient::Pool::BadResponseCodeError => e
        @bulk_request_metrics.increment(:failures)
        log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize}
        log_hash[:body] = e.response_body if @logger.debug? 
        message = "Encountered a retryable error (will retry with exponential backoff)"

        
        
        
        
        if e.response_code == 429
          logger.debug(message, log_hash)
        else
          logger.error(message, log_hash)
        end

        sleep_interval = sleep_for_interval(sleep_interval)
        retry
      rescue => e 
        @logger.error(
          "An unknown error occurred sending a bulk request to OpenSearch (will retry indefinitely)",
          message: e.message, exception: e.class, backtrace: e.backtrace
        )
        @logger.debug? && @logger.debug("Failed actions for last bad bulk request", :actions => actions)

        sleep_interval = sleep_for_interval(sleep_interval)
        @bulk_request_metrics.increment(:failures)
        retry unless @stopping.true?
      end
    end