in kll/items_sketch.go [835:904]
func (s *ItemsSketch[C]) compressWhileUpdatingSketch() {
level := findLevelToCompact(s.k, s.m, s.numLevels, s.levels)
if level == s.numLevels-1 {
//The level to compact is the top level, thus we need to add a level.
//Be aware that this operation grows the items array,
//shifts the items data and the level boundaries of the data,
//and grows the levels array and increments numLevels_.
s.addEmptyTopLevelToCompletelyFullSketch()
}
myLevelsArr := s.levels
rawBeg := myLevelsArr[level]
rawEnd := myLevelsArr[level+1]
// +2 is OK because we already added a new top level if necessary
popAbove := myLevelsArr[level+2] - rawEnd
rawPop := rawEnd - rawBeg
oddPop := rawPop%2 == 1
adjBeg := rawBeg
if oddPop {
adjBeg++
}
adjPop := rawPop
if oddPop {
adjPop--
}
halfAdjPop := adjPop / 2
//the following is specific to generic Items
myItemsArr := s.GetTotalItemsArray()
if level == 0 { // level zero might not be sorted, so we must sort it if we wish to compact it
tmpSlice := myItemsArr[adjBeg : adjBeg+adjPop]
sort.Slice(tmpSlice, func(a, b int) bool {
return s.compareFn(tmpSlice[a], tmpSlice[b])
})
}
if popAbove == 0 {
randomlyHalveUpItems(myItemsArr, adjBeg, adjPop, s.deterministicOffsetForTest)
} else {
randomlyHalveDownItems(myItemsArr, adjBeg, adjPop, s.deterministicOffsetForTest)
mergeSortedItemsArrays(
myItemsArr, adjBeg, halfAdjPop,
myItemsArr, rawEnd, popAbove,
myItemsArr, adjBeg+halfAdjPop, s.compareFn)
}
newIndex := myLevelsArr[level+1] - halfAdjPop // adjust boundaries of the level above
s.levels[level+1] = newIndex
if oddPop {
s.levels[level] = myLevelsArr[level+1] - 1 // the current level now contains one item
myItemsArr[myLevelsArr[level]] = myItemsArr[rawBeg] // namely this leftover guy
} else {
s.levels[level] = myLevelsArr[level+1] // the current level is now empty
}
if level > 0 {
amount := rawBeg - myLevelsArr[0] // adjust boundary
for i := amount; i > 0; i-- {
// Start from the end as we are shifting to the right,
// failing to do so will corrupt the items array.
tgtInx := myLevelsArr[0] + halfAdjPop + i - 1
stcInx := myLevelsArr[0] + i - 1
myItemsArr[tgtInx] = myItemsArr[stcInx]
}
}
for lvl := uint8(0); lvl < level; lvl++ {
newIndex = myLevelsArr[lvl] + halfAdjPop //adjust boundary
s.levels[lvl] = newIndex
}
s.items = myItemsArr
}