in lib/logstash/outputs/amazon_es/common.rb [186:236]
def submit(actions)
bulk_response = safe_bulk(actions)
return if bulk_response.nil?
if bulk_response["errors"]
@bulk_request_metrics.increment(:with_errors)
else
@bulk_request_metrics.increment(:successes)
@document_level_metrics.increment(:successes, actions.size)
return
end
actions_to_retry = []
bulk_response["items"].each_with_index do |response,idx|
action_type, action_props = response.first
status = action_props["status"]
failure = action_props["error"]
action = actions[idx]
action_params = action[1]
if DOC_SUCCESS_CODES.include?(status)
@document_level_metrics.increment(:successes)
next
elsif DOC_CONFLICT_CODE == status
@document_level_metrics.increment(:non_retryable_failures)
@logger.warn "Failed action.", status: status, action: action, response: response if !failure_type_logging_whitelist.include?(failure["type"])
next
elsif DOC_DLQ_CODES.include?(status)
handle_dlq_status("Could not index event to Elasticsearch.", action, status, response)
@document_level_metrics.increment(:non_retryable_failures)
next
else
@document_level_metrics.increment(:retryable_failures)
@logger.info "retrying failed action with response code: #{status} (#{failure})" if !failure_type_logging_whitelist.include?(failure["type"])
actions_to_retry << action
end
end
actions_to_retry
end