in firehose/firehose.go [252:297]
func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) {
if output.dataKeys != "" {
record = plugins.DataKeys(output.dataKeys, record)
}
var err error
record, err = plugins.DecodeMap(record)
if err != nil {
logrus.Debugf("[firehose %d] Failed to decode record: %v\n", output.PluginID, record)
return nil, err
}
if output.replaceDots != "" {
record = replaceDots(record, output.replaceDots)
}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var data []byte
if output.logKey != "" {
log, err := plugins.LogKey(record, output.logKey)
if err != nil {
return nil, err
}
data, err = plugins.EncodeLogKey(log)
} else {
data, err = json.Marshal(record)
}
if err != nil {
logrus.Debugf("[firehose %d] Failed to marshal record: %v\n", output.PluginID, record)
return nil, err
}
// append newline
data = append(data, []byte("\n")...)
if len(data) > maximumRecordSize {
logrus.Warnf("[firehose %d] Found record with %d bytes, truncating to 1000Kib, stream=%s\n", output.PluginID, len(data), output.deliveryStream)
data = data[:maximumRecordSize-len(truncatedSuffix)]
data = append(data, []byte(truncatedSuffix)...)
}
return data, nil
}