in lib/fluent/plugin/kinesis_helper/api.rb [171:209]
def give_up_retries(failed_records)
failed_records.each {|record|
log.error(truncate 'Could not put record, Error: %s/%s, Record: %s' % [
record[:error_code],
record[:error_message],
record[:original]
])
}
if @drop_failed_records_after_batch_request_retries
increment_num_errors
else
case request_type
when :streams, :streams_aggregated
provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' }
target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first
target_error = provisioned_throughput_exceeded_records.empty? ?
Aws::Kinesis::Errors::ServiceError :
Aws::Kinesis::Errors::ProvisionedThroughputExceededException
when :firehose
service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' }
target_failed_record = service_unavailable_exception_records.first || failed_records.first
target_error = service_unavailable_exception_records.empty? ?
Aws::Firehose::Errors::ServiceError :
Aws::Firehose::Errors::ServiceUnavailableException
end
log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying")
raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message])
end
end