submit

in lib/logstash/plugin_mixins/opensearch/common.rb [155:213]


    def submit(actions)
      bulk_response = safe_bulk(actions)

      
      
      return if bulk_response.nil?

      
      if bulk_response["errors"]
        @bulk_request_metrics.increment(:with_errors)
      else
        @bulk_request_metrics.increment(:successes)
        @document_level_metrics.increment(:successes, actions.size)
        return
      end

      responses = bulk_response["items"]
      if responses.size != actions.size 
        
        msg = "Sent #{actions.size} documents but OpenSearch returned #{responses.size} responses"
        @logger.warn(msg, actions: actions, responses: responses)
        fail("#{msg} (likely a bug with _bulk endpoint)")
      end

      actions_to_retry = []
      responses.each_with_index do |response,idx|
        action_type, action_props = response.first

        status = action_props["status"]
        error  = action_props["error"]
        action = actions[idx]
        action_params = action[1]

        
        
        
        
        if DOC_SUCCESS_CODES.include?(status)
          @document_level_metrics.increment(:successes)
          next
        elsif DOC_CONFLICT_CODE == status
          @document_level_metrics.increment(:non_retryable_failures)
          @logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error)
          next
        elsif DOC_DLQ_CODES.include?(status)
          handle_dlq_status("Could not index event to OpenSearch.", action, status, response)
          @document_level_metrics.increment(:non_retryable_failures)
          next
        else
          
          @document_level_metrics.increment(:retryable_failures)
          @logger.info "Retrying failed action", status: status, action: action, error: error if log_failure_type?(error)
          actions_to_retry << action
        end
      end

      actions_to_retry
    end