in kinesis/kinesis.go [430:491]
func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKeyLen int) ([]byte, error) {
if outputPlugin.dataKeys != "" {
record = plugins.DataKeys(outputPlugin.dataKeys, record)
}
var err error
record, err = plugins.DecodeMap(record)
if err != nil {
logrus.Debugf("[kinesis %d] Failed to decode record: %v\n", outputPlugin.PluginID, record)
return nil, err
}
if outputPlugin.replaceDots != "" {
record = replaceDots(record, outputPlugin.replaceDots)
}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var data []byte
if outputPlugin.logKey != "" {
log, err := plugins.LogKey(record, outputPlugin.logKey)
if err != nil {
return nil, err
}
data, err = plugins.EncodeLogKey(log)
} else {
data, err = json.Marshal(record)
}
if err != nil {
logrus.Debugf("[kinesis %d] Failed to marshal record: %v\n", outputPlugin.PluginID, record)
return nil, err
}
// append a newline after each log record
if outputPlugin.appendNewline {
data = append(data, []byte("\n")...)
}
// max truncation size
maxDataSize := maximumRecordSize-partitionKeyLen
switch outputPlugin.compression {
case CompressionZlib:
data, err = compressThenTruncate(zlibCompress, data, maxDataSize, []byte(truncatedSuffix), *outputPlugin)
case CompressionGzip:
data, err = compressThenTruncate(gzipCompress, data, maxDataSize, []byte(truncatedSuffix), *outputPlugin)
default:
}
if err != nil {
return nil, err
}
if len(data)+partitionKeyLen > maximumRecordSize {
logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+partitionKeyLen, outputPlugin.stream)
data = data[:maxDataSize-len(truncatedSuffix)]
data = append(data, []byte(truncatedSuffix)...)
}
return data, nil
}