in firehose/firehose.go [328:365]
func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatchOutput) (int, error) {
if aws.Int64Value(response.FailedPutCount) > 0 {
// start timer if all records failed (no progress has been made)
if aws.Int64Value(response.FailedPutCount) == int64(len(output.records)) {
output.timer.Start()
return fluentbit.FLB_RETRY, fmt.Errorf("PutRecordBatch request returned with no records successfully recieved")
}
logrus.Warnf("[firehose %d] %d records failed to be delivered. Will retry.\n", output.PluginID, aws.Int64Value(response.FailedPutCount))
failedRecords := make([]*firehose.Record, 0, aws.Int64Value(response.FailedPutCount))
// try to resend failed records
for i, record := range response.RequestResponses {
if record.ErrorMessage != nil {
logrus.Debugf("[firehose %d] Record failed to send with error: %s\n", output.PluginID, aws.StringValue(record.ErrorMessage))
failedRecords = append(failedRecords, output.records[i])
}
if aws.StringValue(record.ErrorCode) == firehose.ErrCodeServiceUnavailableException {
logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
return fluentbit.FLB_RETRY, nil
}
}
output.records = output.records[:0]
output.records = append(output.records, failedRecords...)
output.dataLength = 0
for _, record := range output.records {
output.dataLength += len(record.Data)
}
} else {
// request fully succeeded
output.timer.Reset()
output.records = output.records[:0]
output.dataLength = 0
}
return fluentbit.FLB_OK, nil
}