func()

in firehose/firehose.go [252:297]


func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) {
	if output.dataKeys != "" {
		record = plugins.DataKeys(output.dataKeys, record)
	}

	var err error
	record, err = plugins.DecodeMap(record)
	if err != nil {
		logrus.Debugf("[firehose %d] Failed to decode record: %v\n", output.PluginID, record)
		return nil, err
	}

	if output.replaceDots != "" {
		record = replaceDots(record, output.replaceDots)
	}

	var json = jsoniter.ConfigCompatibleWithStandardLibrary
	var data []byte

	if output.logKey != "" {
		log, err := plugins.LogKey(record, output.logKey)
		if err != nil {
			return nil, err
		}

		data, err = plugins.EncodeLogKey(log)
	} else {
		data, err = json.Marshal(record)
	}

	if err != nil {
		logrus.Debugf("[firehose %d] Failed to marshal record: %v\n", output.PluginID, record)
		return nil, err
	}

	// append newline
	data = append(data, []byte("\n")...)

	if len(data) > maximumRecordSize {
		logrus.Warnf("[firehose %d] Found record with %d bytes, truncating to 1000Kib, stream=%s\n", output.PluginID, len(data), output.deliveryStream)
		data = data[:maximumRecordSize-len(truncatedSuffix)]
		data = append(data, []byte(truncatedSuffix)...)
	}

	return data, nil
}