in kinesis/kinesis.go [237:285]
func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int {
if outputPlugin.timeKey != "" {
buf := new(bytes.Buffer)
err := outputPlugin.fmtStrftime.Format(buf, *timeStamp)
if err != nil {
logrus.Errorf("[kinesis %d] Could not create timestamp %v\n", outputPlugin.PluginID, err)
return fluentbit.FLB_ERROR
}
record[outputPlugin.timeKey] = buf.String()
}
partitionKey, hasPartitionKey := outputPlugin.getPartitionKey(record)
var partitionKeyLen = len(partitionKey)
if !hasPartitionKey {
partitionKeyLen = outputPlugin.stringGen.Size
}
data, err := outputPlugin.processRecord(record, partitionKeyLen)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
// discard this single bad record instead and let the batch continue
return fluentbit.FLB_OK
}
if !outputPlugin.isAggregate {
if !hasPartitionKey {
partitionKey = outputPlugin.stringGen.RandomString()
}
logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
*records = append(*records, &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
})
} else {
// Use the KPL aggregator to buffer records isAggregate is true
aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, hasPartitionKey, data)
if err != nil {
logrus.Errorf("[kinesis %d] Failed to aggregate record %v\n", outputPlugin.PluginID, err)
// discard this single bad record instead and let the batch continue
return fluentbit.FLB_OK
}
// If aggRecord isn't nil, then a full kinesis record has been aggregated
if aggRecord != nil {
*records = append(*records, aggRecord)
}
}
return fluentbit.FLB_OK
}