in firehose/firehose.go [183:221]
func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int {
if output.timeKey != "" {
buf := new(bytes.Buffer)
err := output.fmtStrftime.Format(buf, *timeStamp)
if err != nil {
logrus.Errorf("[firehose %d] Could not create timestamp %v\n", output.PluginID, err)
return fluentbit.FLB_ERROR
}
record[output.timeKey] = buf.String()
}
data, err := output.processRecord(record)
if err != nil {
logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
// discard this single bad record instead and let the batch continue
return fluentbit.FLB_OK
}
newDataSize := len(data)
if len(output.records) == maximumRecordsPerPut || (output.dataLength+newDataSize) > maximumPutRecordBatchSize {
retCode, err := output.sendCurrentBatch()
if err != nil {
logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
}
if retCode != fluentbit.FLB_OK {
return retCode
}
}
if output.simpleAggregation && len(output.records) > 0 && len(output.records[len(output.records)-1].Data) + newDataSize <= maximumRecordSize {
output.records[len(output.records)-1].Data = append(output.records[len(output.records)-1].Data, data...)
} else {
output.records = append(output.records, &firehose.Record{
Data: data,
})
}
output.dataLength += newDataSize
return fluentbit.FLB_OK
}