in lib/logstash/plugin_mixins/opensearch/common.rb [221:278]
def safe_bulk(actions)
sleep_interval = @retry_initial_interval
begin
@client.bulk(actions)
rescue ::LogStash::Outputs::OpenSearch::HttpClient::Pool::HostUnreachableError => e
@logger.error(
"Attempted to send a bulk request but OpenSearch appears to be unreachable or down",
message: e.message, exception: e.class, will_retry_in_seconds: sleep_interval
)
@logger.debug? && @logger.debug("Failed actions for last bad bulk request", :actions => actions)
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
rescue ::LogStash::Outputs::OpenSearch::HttpClient::Pool::NoConnectionAvailableError => e
@logger.error(
"Attempted to send a bulk request but there are no living connections in the pool " +
"(perhaps OpenSearch is unreachable or down?)",
message: e.message, exception: e.class, will_retry_in_seconds: sleep_interval
)
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
rescue ::LogStash::Outputs::OpenSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_request_metrics.increment(:failures)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize}
log_hash[:body] = e.response_body if @logger.debug?
message = "Encountered a retryable error (will retry with exponential backoff)"
if e.response_code == 429
logger.debug(message, log_hash)
else
logger.error(message, log_hash)
end
sleep_interval = sleep_for_interval(sleep_interval)
retry
rescue => e
@logger.error(
"An unknown error occurred sending a bulk request to OpenSearch (will retry indefinitely)",
message: e.message, exception: e.class, backtrace: e.backtrace
)
@logger.debug? && @logger.debug("Failed actions for last bad bulk request", :actions => actions)
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
end
end