in lib/logstash/outputs/amazon_es/common.rb [278:346]
def safe_bulk(actions)
sleep_interval = @retry_initial_interval
begin
es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
response = @client.bulk(es_actions)
response
rescue ::LogStash::Outputs::AmazonElasticSearch::HttpClient::Pool::HostUnreachableError => e
@logger.error(
"Attempted to send a bulk request to elasticsearch'"+
" but Elasticsearch appears to be unreachable or down!",
:error_message => e.message,
:class => e.class.name,
:will_retry_in_seconds => sleep_interval
)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
rescue ::LogStash::Outputs::AmazonElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
@logger.error(
"Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?",
:error_message => e.message,
:class => e.class.name,
:will_retry_in_seconds => sleep_interval
)
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
rescue ::LogStash::Outputs::AmazonElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_request_metrics.increment(:failures)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
log_hash[:body] = e.response_body if @logger.debug?
message = "Encountered a retryable error. Will Retry with exponential backoff "
if e.response_code == 429
logger.debug(message, log_hash)
else
logger.error(message, log_hash)
end
sleep_interval = sleep_for_interval(sleep_interval)
retry
rescue => e
@logger.error(
"An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely",
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace
)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
end
end