in kinesis/kinesis.go [519:563]
func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) {
var retCode int = fluentbit.FLB_OK
var limitsExceeded bool
if aws.Int64Value(response.FailedRecordCount) > 0 {
// start timer if all records failed (no progress has been made)
if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) {
outputPlugin.timer.Start()
return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved")
}
logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records))
failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount))
// try to resend failed records
for i, record := range response.Records {
if record.ErrorMessage != nil {
logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage))
failedRecords = append(failedRecords, (*records)[i])
}
if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException {
retCode = fluentbit.FLB_RETRY
limitsExceeded = true
}
}
if limitsExceeded {
logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID)
}
*records = (*records)[:0]
*records = append(*records, failedRecords...)
*dataLength = 0
for _, record := range *records {
*dataLength += len(record.Data)
}
} else {
// request fully succeeded
outputPlugin.timer.Reset()
*records = (*records)[:0]
*dataLength = 0
}
return retCode, nil
}