give_up_retries

in lib/fluent/plugin/kinesis_helper/api.rb [171:209]


          def give_up_retries(failed_records)
            failed_records.each {|record|
              log.error(truncate 'Could not put record, Error: %s/%s, Record: %s' % [
                record[:error_code],
                record[:error_message],
                record[:original]
              ])
            }

            if @drop_failed_records_after_batch_request_retries
              
              increment_num_errors
            else
              
              case request_type
              
              
              
              when :streams, :streams_aggregated
                provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' }
                target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first
                target_error = provisioned_throughput_exceeded_records.empty? ?
                  Aws::Kinesis::Errors::ServiceError :
                  Aws::Kinesis::Errors::ProvisionedThroughputExceededException
              
              
              
              when :firehose
                service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' }
                target_failed_record = service_unavailable_exception_records.first || failed_records.first
                target_error = service_unavailable_exception_records.empty? ?
                  Aws::Firehose::Errors::ServiceError :
                  Aws::Firehose::Errors::ServiceUnavailableException
              end
              log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying")
              raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message])
            end
          end