func compressThenTruncate()

in kinesis/kinesis.go [655:732]


func compressThenTruncate(compressorFunc CompressorFunc, data []byte, maxOutLen int, truncatedSuffix []byte, outputPlugin OutputPlugin) ([]byte, error) {
	var compressedData []byte
	var truncationBuffer []byte
	var originalCompressedLen int
	var compressedLen int
	var err error

	/* Iterative approach to truncation */
	isTruncated := false
	compressedLen = math.MaxInt64
	truncatedInLen := len(data)
	truncationBuffer = data
	truncationCompressionAttempts := 0
	for (compressedLen > maxOutLen) {
		compressedData, err = compressorFunc(truncationBuffer)
		if err != nil {
			return nil, err
		}
		compressedLen = len(compressedData)

		/* Truncation needed */
		if (compressedLen > maxOutLen) {
			truncationCompressionAttempts++
			logrus.Debugf("[kinesis %d] iterative truncation round stream=%s\n",
						 outputPlugin.PluginID, outputPlugin.stream)

			/* Base case: input compressed empty string, output still too large */
			if (truncatedInLen == 0) {
				logrus.Errorf("[kinesis %d] truncation failed, compressed empty input too " +
							 "large stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
				return nil, errors.New("compressed empty to large");
			}

			/* Base case: too many attempts - just to be extra safe */
			if (truncationCompressionAttempts > truncationCompressionMaxAttempts) {
				logrus.Errorf("[kinesis %d] truncation failed, too many compression attempts " +
							 "stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
				return nil, errors.New("too many compression attempts");
			}

			/* Calculate corrected input size */
			truncatedInLenPrev := truncatedInLen;
			truncatedInLen = (maxOutLen * truncatedInLen) / compressedLen;
			truncatedInLen = (truncatedInLen * truncationReductionPercent) / 100;

			/* Ensure working down */
			if (truncatedInLen >= truncatedInLenPrev) {
				truncatedInLen = truncatedInLenPrev - 1;
			}

			/* Allocate truncation buffer */
			if (!isTruncated) {
				isTruncated = true;
				originalCompressedLen = compressedLen
				truncationBuffer = make([]byte, truncatedInLen)
				copy(truncationBuffer, data[:truncatedInLen])
			}

			/* Slap on truncation suffix */
			if (truncatedInLen < len(truncatedSuffix)) {
				/* No room for the truncation suffix. Terminal error */
				logrus.Errorf("[kinesis %d] truncation failed, no room for suffix " +
							 "stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
				return nil, errors.New("no room for suffix");
			}
			truncationBuffer = truncationBuffer[:truncatedInLen]
			copy(truncationBuffer[len(truncationBuffer)-len(truncatedSuffix):], truncatedSuffix)
		}
	}

	if (isTruncated) {
		logrus.Warnf("[kinesis %d] Found compressed record with %d bytes, " +
					 "truncating to %d bytes after compression, stream=%s\n",
					 outputPlugin.PluginID, originalCompressedLen, len(compressedData), outputPlugin.stream)
	}

	return compressedData, nil
}