safe_bulk

in lib/logstash/outputs/amazon_es/common.rb [278:346]


    def safe_bulk(actions)
      sleep_interval = @retry_initial_interval
      begin
        es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
        response = @client.bulk(es_actions)
        response
      rescue ::LogStash::Outputs::AmazonElasticSearch::HttpClient::Pool::HostUnreachableError => e
        
        
        @logger.error(
          "Attempted to send a bulk request to elasticsearch'"+
            " but Elasticsearch appears to be unreachable or down!",
          :error_message => e.message,
          :class => e.class.name,
          :will_retry_in_seconds => sleep_interval
        )
        @logger.debug("Failed actions for last bad bulk request!", :actions => actions)

        
        sleep_interval = sleep_for_interval(sleep_interval)
        @bulk_request_metrics.increment(:failures)
        retry unless @stopping.true?
      rescue ::LogStash::Outputs::AmazonElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
        @logger.error(
          "Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?",
          :error_message => e.message,
          :class => e.class.name,
          :will_retry_in_seconds => sleep_interval
        )
        Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
        sleep_interval = next_sleep_interval(sleep_interval)
        @bulk_request_metrics.increment(:failures)
        retry unless @stopping.true?
      rescue ::LogStash::Outputs::AmazonElasticSearch::HttpClient::Pool::BadResponseCodeError => e
        @bulk_request_metrics.increment(:failures)
        log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
        log_hash[:body] = e.response_body if @logger.debug? 
        message = "Encountered a retryable error. Will Retry with exponential backoff "

        
        
        
        
        if e.response_code == 429
          logger.debug(message, log_hash)
        else
          logger.error(message, log_hash)
        end

        sleep_interval = sleep_for_interval(sleep_interval)
        retry
      rescue => e
        
        
        @logger.error(
          "An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely",
          :error_message => e.message,
          :error_class => e.class.name,
          :backtrace => e.backtrace
        )

        @logger.debug("Failed actions for last bad bulk request!", :actions => actions)

        sleep_interval = sleep_for_interval(sleep_interval)
        @bulk_request_metrics.increment(:failures)
        retry unless @stopping.true?
      end
    end