in kinesis/kinesis.go [493:515]
func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) {
if len(*records) == 0 {
return fluentbit.FLB_OK, nil
}
outputPlugin.timer.Check()
response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{
Records: *records,
StreamName: aws.String(outputPlugin.stream),
})
if err != nil {
logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err)
outputPlugin.timer.Start()
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException {
logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID)
}
}
return fluentbit.FLB_RETRY, err
}
logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records))
return outputPlugin.processAPIResponse(records, dataLength, response)
}