in kinesis/kinesis.go [306:345]
func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int {
// Use a different buffer to batch the logs
requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut)
dataLength := 0
for i, record := range *records {
newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey))
if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize {
retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
}
if retCode != fluentbit.FLB_OK {
unsent := (*records)[i:]
// requestBuf will contain records sendCurrentBatch failed to send,
// combine those with the records yet to be sent/batched
*records = append(requestBuf, unsent...)
return retCode
}
}
requestBuf = append(requestBuf, record)
dataLength += newRecordSize
}
// send any remaining records
retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
}
if retCode == output.FLB_OK {
logrus.Debugf("[kinesis %d] Flushed %d logs\n", outputPlugin.PluginID, len(*records))
}
// requestBuf will contain records sendCurrentBatch failed to send
*records = requestBuf
return retCode
}