func generalItemsCompress[C comparable]()

in kll/items_sketch.go [1095:1193]


func generalItemsCompress[C comparable](
	k uint16,
	m uint8,
	numLevelsIn uint8,
	inBuf []C,
	inLevels []uint32,
	outBuf []C,
	outLevels []uint32,
	isLevelZeroSorted bool,
	compareFn common.CompareFn[C],
	deterministicOffsetForTest bool,
) []uint32 {
	numLevels := numLevelsIn
	currentItemCount := inLevels[numLevels] - inLevels[0]        // decreases with each compaction
	targetItemCount := computeTotalItemCapacity(k, m, numLevels) // increases if we add levels
	doneYet := false
	outLevels[0] = 0
	curLevel := -1
	for !doneYet {
		curLevel++ // start out at level 0

		// If we are at the current top level, add an empty level above it for convenience,
		// but do not increment numLevels until later
		if curLevel == (int(numLevels) - 1) {
			inLevels[curLevel+2] = inLevels[curLevel+1]
		}

		rawBeg := inLevels[curLevel]
		rawLim := inLevels[curLevel+1]
		rawPop := rawLim - rawBeg

		if (uint32(currentItemCount) < targetItemCount) || (rawPop < levelCapacity(k, numLevels, uint8(curLevel), m)) {
			for i := uint32(0); i < rawPop; i++ {
				outBuf[outLevels[curLevel]+i] = inBuf[rawBeg+i]
			}
			outLevels[curLevel+1] = outLevels[curLevel] + rawPop
		} else {
			// The sketch is too full AND this level is too full, so we compact it
			// Note: this can add a level and thus change the sketch's capacity

			popAbove := inLevels[curLevel+2] - rawLim
			oddPop := rawPop%2 == 1
			adjBeg := rawBeg
			if oddPop {
				adjBeg++
			}
			adjPop := rawPop
			if oddPop {
				adjPop--
			}
			halfAdjPop := adjPop / 2

			if oddPop {
				outBuf[outLevels[curLevel]] = inBuf[rawBeg]
				outLevels[curLevel+1] = outLevels[curLevel] + 1
			} else {
				outLevels[curLevel+1] = outLevels[curLevel]
			}

			// level zero might not be sorted, so we must sort it if we wish to compact it
			if (curLevel == 0) && !isLevelZeroSorted {
				tmpSlice := inBuf[adjBeg : adjBeg+adjPop]
				sort.Slice(tmpSlice, func(a, b int) bool {
					return compareFn(tmpSlice[a], tmpSlice[b])
				})
			}

			if popAbove == 0 {
				randomlyHalveUpItems(inBuf, adjBeg, adjPop, deterministicOffsetForTest)
			} else {
				randomlyHalveDownItems(inBuf, adjBeg, adjPop, deterministicOffsetForTest)
				mergeSortedItemsArrays(
					inBuf, adjBeg, halfAdjPop,
					inBuf, rawLim, popAbove,
					inBuf, adjBeg+halfAdjPop, compareFn)
			}

			// track the fact that we just eliminated some data
			currentItemCount -= halfAdjPop

			// Adjust the boundaries of the level above
			inLevels[curLevel+1] = inLevels[curLevel+1] - halfAdjPop

			// Increment numLevels if we just compacted the old top level
			// This creates some more capacity (the size of the new bottom level)
			if curLevel == (int(numLevels) - 1) {
				numLevels++
				targetItemCount += levelCapacity(k, numLevels, 0, m)
			}
		} // end of code for compacting a level

		// determine whether we have processed all levels yet (including any new levels that we created)
		if curLevel == (int(numLevels) - 1) {
			doneYet = true
		}
	} // end of loop over levels

	return []uint32{uint32(numLevels), targetItemCount, currentItemCount}
}