in lib/logstash/plugin_mixins/opensearch/common.rb [155:213]
def submit(actions)
bulk_response = safe_bulk(actions)
return if bulk_response.nil?
if bulk_response["errors"]
@bulk_request_metrics.increment(:with_errors)
else
@bulk_request_metrics.increment(:successes)
@document_level_metrics.increment(:successes, actions.size)
return
end
responses = bulk_response["items"]
if responses.size != actions.size
msg = "Sent #{actions.size} documents but OpenSearch returned #{responses.size} responses"
@logger.warn(msg, actions: actions, responses: responses)
fail("#{msg} (likely a bug with _bulk endpoint)")
end
actions_to_retry = []
responses.each_with_index do |response,idx|
action_type, action_props = response.first
status = action_props["status"]
error = action_props["error"]
action = actions[idx]
action_params = action[1]
if DOC_SUCCESS_CODES.include?(status)
@document_level_metrics.increment(:successes)
next
elsif DOC_CONFLICT_CODE == status
@document_level_metrics.increment(:non_retryable_failures)
@logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error)
next
elsif DOC_DLQ_CODES.include?(status)
handle_dlq_status("Could not index event to OpenSearch.", action, status, response)
@document_level_metrics.increment(:non_retryable_failures)
next
else
@document_level_metrics.increment(:retryable_failures)
@logger.info "Retrying failed action", status: status, action: action, error: error if log_failure_type?(error)
actions_to_retry << action
end
end
actions_to_retry
end