in kinesis/kinesis.go [655:732]
func compressThenTruncate(compressorFunc CompressorFunc, data []byte, maxOutLen int, truncatedSuffix []byte, outputPlugin OutputPlugin) ([]byte, error) {
var compressedData []byte
var truncationBuffer []byte
var originalCompressedLen int
var compressedLen int
var err error
/* Iterative approach to truncation */
isTruncated := false
compressedLen = math.MaxInt64
truncatedInLen := len(data)
truncationBuffer = data
truncationCompressionAttempts := 0
for (compressedLen > maxOutLen) {
compressedData, err = compressorFunc(truncationBuffer)
if err != nil {
return nil, err
}
compressedLen = len(compressedData)
/* Truncation needed */
if (compressedLen > maxOutLen) {
truncationCompressionAttempts++
logrus.Debugf("[kinesis %d] iterative truncation round stream=%s\n",
outputPlugin.PluginID, outputPlugin.stream)
/* Base case: input compressed empty string, output still too large */
if (truncatedInLen == 0) {
logrus.Errorf("[kinesis %d] truncation failed, compressed empty input too " +
"large stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
return nil, errors.New("compressed empty to large");
}
/* Base case: too many attempts - just to be extra safe */
if (truncationCompressionAttempts > truncationCompressionMaxAttempts) {
logrus.Errorf("[kinesis %d] truncation failed, too many compression attempts " +
"stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
return nil, errors.New("too many compression attempts");
}
/* Calculate corrected input size */
truncatedInLenPrev := truncatedInLen;
truncatedInLen = (maxOutLen * truncatedInLen) / compressedLen;
truncatedInLen = (truncatedInLen * truncationReductionPercent) / 100;
/* Ensure working down */
if (truncatedInLen >= truncatedInLenPrev) {
truncatedInLen = truncatedInLenPrev - 1;
}
/* Allocate truncation buffer */
if (!isTruncated) {
isTruncated = true;
originalCompressedLen = compressedLen
truncationBuffer = make([]byte, truncatedInLen)
copy(truncationBuffer, data[:truncatedInLen])
}
/* Slap on truncation suffix */
if (truncatedInLen < len(truncatedSuffix)) {
/* No room for the truncation suffix. Terminal error */
logrus.Errorf("[kinesis %d] truncation failed, no room for suffix " +
"stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
return nil, errors.New("no room for suffix");
}
truncationBuffer = truncationBuffer[:truncatedInLen]
copy(truncationBuffer[len(truncationBuffer)-len(truncatedSuffix):], truncatedSuffix)
}
}
if (isTruncated) {
logrus.Warnf("[kinesis %d] Found compressed record with %d bytes, " +
"truncating to %d bytes after compression, stream=%s\n",
outputPlugin.PluginID, originalCompressedLen, len(compressedData), outputPlugin.stream)
}
return compressedData, nil
}