func()

in kinesis/kinesis.go [430:491]


func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKeyLen int) ([]byte, error) {
	if outputPlugin.dataKeys != "" {
		record = plugins.DataKeys(outputPlugin.dataKeys, record)
	}

	var err error
	record, err = plugins.DecodeMap(record)
	if err != nil {
		logrus.Debugf("[kinesis %d] Failed to decode record: %v\n", outputPlugin.PluginID, record)
		return nil, err
	}

	if outputPlugin.replaceDots != "" {
		record = replaceDots(record, outputPlugin.replaceDots)
	}

	var json = jsoniter.ConfigCompatibleWithStandardLibrary
	var data []byte

	if outputPlugin.logKey != "" {
		log, err := plugins.LogKey(record, outputPlugin.logKey)
		if err != nil {
			return nil, err
		}

		data, err = plugins.EncodeLogKey(log)
	} else {
		data, err = json.Marshal(record)
	}

	if err != nil {
		logrus.Debugf("[kinesis %d] Failed to marshal record: %v\n", outputPlugin.PluginID, record)
		return nil, err
	}

	// append a newline after each log record
	if outputPlugin.appendNewline {
		data = append(data, []byte("\n")...)
	}

	// max truncation size
	maxDataSize := maximumRecordSize-partitionKeyLen

	switch outputPlugin.compression {
	case CompressionZlib:
		data, err = compressThenTruncate(zlibCompress, data, maxDataSize, []byte(truncatedSuffix), *outputPlugin)
	case CompressionGzip:
		data, err = compressThenTruncate(gzipCompress, data, maxDataSize, []byte(truncatedSuffix), *outputPlugin)
	default:
	}
	if err != nil {
		return nil, err
	}

	if len(data)+partitionKeyLen > maximumRecordSize {
		logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+partitionKeyLen, outputPlugin.stream)
		data = data[:maxDataSize-len(truncatedSuffix)]
		data = append(data, []byte(truncatedSuffix)...)
	}

	return data, nil
}