kll/items_sketch.go (877 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Package kll is an implementation of a very compact quantiles sketch with lazy compaction scheme // and nearly optimal accuracy per retained quantile.</p> // // Reference: https://arxiv.org/abs/1603.05346v2" Optimal Quantile Approximation in Streams // // The default k of 200 yields a "single-sided" epsilon of about 1.33% and a // "double-sided" (PMF) epsilon of about 1.65%, with a confidence of 99%. // // See "https://datasketches.apache.org/docs/KLL/KLLSketch.html" KLL Sketch package kll import ( "encoding/binary" "fmt" "github.com/apache/datasketches-go/common" "github.com/apache/datasketches-go/internal" "math/rand" "sort" ) type ItemsSketch[C comparable] struct { // k is the config that controls the accuracy of the sketch and its memory space usage // The default k = 200 results in a normalized rank error of about 1.65%. k uint16 // m is the number of items in the base level of the KLL array m uint8 minK uint16 numLevels uint8 isLevelZeroSorted bool n uint64 levels []uint32 items []C minItem *C maxItem *C sortedView *ItemsSketchSortedView[C] serde common.ItemSketchSerde[C] compareFn common.CompareFn[C] // Force deterministic offset for test, so that we can compare results across implementation. deterministicOffsetForTest bool } const ( _DEFAULT_K = uint16(200) _DEFAULT_M = uint8(8) _MIN_K = uint16(_DEFAULT_M) _MAX_K = (1 << 16) - 1 _MIN_M = 2 //The minimum M _MAX_M = 8 //The maximum M ) var ( powersOfThree = []uint64{1, 3, 9, 27, 81, 243, 729, 2187, 6561, 19683, 59049, 177147, 531441, 1594323, 4782969, 14348907, 43046721, 129140163, 387420489, 1162261467, 3486784401, 10460353203, 31381059609, 94143178827, 282429536481, 847288609443, 2541865828329, 7625597484987, 22876792454961, 68630377364883, 205891132094649} // Used for deterministic rand behavior in tests nextOffsetForTest = 0 ) // NewKllItemsSketch create a new ItemsSketch with the given k and m. // The default k = 200 results in a normalized rank error of about 1.65%. // Larger K will have smaller error but the sketch will be larger (and slower). func NewKllItemsSketch[C comparable](k uint16, m uint8, compareFn common.CompareFn[C], serde common.ItemSketchSerde[C]) (*ItemsSketch[C], error) { if k < _MIN_K || k > _MAX_K { return nil, fmt.Errorf("k must be >= %d and <= %d: %d", _MIN_K, _MAX_K, k) } if compareFn == nil { return nil, fmt.Errorf("no compare function provided") } return &ItemsSketch[C]{ k: k, m: m, minK: k, numLevels: uint8(1), levels: []uint32{uint32(k), uint32(k)}, items: make([]C, k), serde: serde, compareFn: compareFn, }, nil } // NewKllItemsSketchWithDefault create a new ItemsSketch with default k and m. // The default k = 200 results in a normalized rank error of about 1.65%. func NewKllItemsSketchWithDefault[C comparable](compareFn common.CompareFn[C], serde common.ItemSketchSerde[C]) (*ItemsSketch[C], error) { return NewKllItemsSketch[C](_DEFAULT_K, _DEFAULT_M, compareFn, serde) } // NewKllItemsSketchFromSlice create a new ItemsSketch from the given byte slice (serialized sketch). func NewKllItemsSketchFromSlice[C comparable](sl []byte, compareFn common.CompareFn[C], serde common.ItemSketchSerde[C]) (*ItemsSketch[C], error) { if serde == nil { return nil, fmt.Errorf("no SerDe provided") } if compareFn == nil { return nil, fmt.Errorf("no compare function provided") } memVal, err := newItemsSketchMemoryValidate(sl, serde) if err != nil { return nil, err } var ( k = memVal.k m = memVal.m levelsArr = memVal.levelsArr n = memVal.n minK = memVal.minK isLevelZeroSorted = memVal.level0SortedFlag minItem *C maxItem *C items = make([]C, levelsArr[memVal.numLevels]) ) switch memVal.sketchStructure { case _COMPACT_EMPTY: minItem = nil maxItem = nil items = make([]C, k) case _COMPACT_SINGLE: offset := _N_LONG_ADR deserItems, err := serde.DeserializeManyFromSlice(sl, offset, 1) if err != nil { return nil, err } minItem = &deserItems[0] maxItem = &deserItems[0] items = make([]C, k) items[k-1] = deserItems[0] case _COMPACT_FULL: offset := int(_DATA_START_ADR + memVal.numLevels*4) deserMinItems, err := serde.DeserializeManyFromSlice(sl, offset, 1) minItem = &deserMinItems[0] if err != nil { return nil, err } offset += serde.SizeOf(*minItem) deserMaxItems, err := serde.DeserializeManyFromSlice(sl, offset, 1) maxItem = &deserMaxItems[0] if err != nil { return nil, err } offset += serde.SizeOf(*maxItem) numRetained := levelsArr[memVal.numLevels] - levelsArr[0] deseRetItems, err := serde.DeserializeManyFromSlice(sl, offset, int(numRetained)) if err != nil { return nil, err } for i := uint32(0); i < numRetained; i++ { items[i+levelsArr[0]] = deseRetItems[i] } } return &ItemsSketch[C]{ k: k, m: m, minK: minK, numLevels: memVal.numLevels, isLevelZeroSorted: isLevelZeroSorted, n: n, levels: levelsArr, items: items, minItem: minItem, maxItem: maxItem, serde: serde, compareFn: compareFn, }, nil } // IsEmpty returns true if the sketch is empty, otherwise false. func (s *ItemsSketch[C]) IsEmpty() bool { return s.n == 0 } // GetN returns the value of n (the length of the input stream offered to the sketch) func (s *ItemsSketch[C]) GetN() uint64 { return s.n } // GetK returns the value of k (which controls the accuracy of the sketch and its memory space usage) func (s *ItemsSketch[C]) GetK() uint16 { return s.k } // GetNumRetained returns the number of quantiles retained by the sketch. func (s *ItemsSketch[C]) GetNumRetained() uint32 { return s.levels[s.numLevels] - s.levels[0] } // GetMinItem returns the minimum item of the stream. This may be distinct from the smallest item retained by the sketch algorithm. func (s *ItemsSketch[C]) GetMinItem() (C, error) { if s.IsEmpty() { return *new(C), fmt.Errorf("operation is undefined for an empty sketch") } return *s.minItem, nil } // GetMaxItem returns the maximum item of the stream. This may be distinct from the largest item retained by the sketch algorithm. func (s *ItemsSketch[C]) GetMaxItem() (C, error) { if s.IsEmpty() { return *new(C), fmt.Errorf("operation is undefined for an empty sketch") } return *s.maxItem, nil } // IsEstimationMode returns true if the sketch is in estimation mode, otherwise false. func (s *ItemsSketch[C]) IsEstimationMode() bool { return s.numLevels > 1 } // GetTotalItemsArray return the serialized byte array of the entire internal items hypothetical structure. // It does not include the preamble, the levels array, or minimum or maximum items. // It may include empty or garbage items. func (s *ItemsSketch[C]) GetTotalItemsArray() []C { if s.n == 0 { return make([]C, s.k) } outArr := make([]C, len(s.items)) copy(outArr, s.items) return outArr } // GetRank return the normalized rank corresponding to the given a quantile. // if INCLUSIVE the given quantile is included into the rank. func (s *ItemsSketch[C]) GetRank(item C, inclusive bool) (float64, error) { if s.IsEmpty() { return 0, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return 0, err } return s.sortedView.GetRank(item, inclusive) } // GetRanks return an array of normalized ranks corresponding to the given array of quantiles and the given search criterion. // if INCLUSIVE, the given quantiles include the rank directly corresponding to each quantile. func (s *ItemsSketch[C]) GetRanks(item []C, inclusive bool) ([]float64, error) { if s.IsEmpty() { return nil, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return nil, err } ranks := make([]float64, len(item)) for i := range item { ranks[i], err = s.sortedView.GetRank(item[i], inclusive) if err != nil { return nil, err } } return ranks, nil } // GetQuantile return the approximate quantile of the given normalized rank and the given search criterion. // If INCLUSIVE, the given rank includes all quantiles <= the quantile directly corresponding to the given rank. // If EXCLUSIVE, the given rank includes all quantiles < the quantile directly corresponding to the given rank. func (s *ItemsSketch[C]) GetQuantile(rank float64, inclusive bool) (C, error) { if s.IsEmpty() { return *new(C), fmt.Errorf("operation is undefined for an empty sketch") } if rank < 0.0 || rank > 1.0 { return *new(C), fmt.Errorf("normalized rank cannot be less than zero or greater than 1.0: %f", rank) } err := s.setupSortedView() if err != nil { return *new(C), err } return s.sortedView.GetQuantile(rank, inclusive) } // GetQuantiles return an array of quantiles from the given array of normalized ranks. // if INCLUSIVE, the given ranks include all quantiles <= the quantile directly corresponding to each rank. func (s *ItemsSketch[C]) GetQuantiles(ranks []float64, inclusive bool) ([]C, error) { if s.IsEmpty() { return nil, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return nil, err } quantiles := make([]C, len(ranks)) for i := range ranks { quantiles[i], err = s.sortedView.GetQuantile(ranks[i], inclusive) if err != nil { return nil, err } } return quantiles, nil } // GetPMF returns an approximation to the Probability Mass Function (PMF) of the input stream // as an array of probability masses as doubles on the interval [0.0, 1.0], given a set of splitPoints. // // The resulting approximations have a probabilistic guarantee that can be obtained from the // getNormalizedRankError(true) function.</p> // // - splitPoints an array of m unique, monotonically increasing items // (of the same type as the input items) // that divide the item input domain into <i>m+1</i> consecutive, non-overlapping intervals. // // Each interval except for the end intervals starts with a split point and ends with the next split // point in sequence. // // The first interval starts below the lowest item retained by the sketch // corresponding to a zero rank or zero probability, and ends with the first split point</p> // // The last (m+1)th interval starts with the last split point and ends after the last // item retained by the sketch corresponding to a rank or probability of 1.0. // // The sum of the probability masses of all (m+1) intervals is 1.0. // // If the search criterion is: // // - INCLUSIVE, and the upper split point of an interval equals an item retained by the sketch, the interval // will include that item. If the lower split point equals an item retained by the sketch, the interval will exclude // that item. // // - EXCLUSIVE, and the upper split point of an interval equals an item retained by the sketch, the interval // will exclude that item. If the lower split point equals an item retained by the sketch, the interval will include // that item. // // It is not recommended to include either the minimum or maximum items of the input stream. func (s *ItemsSketch[C]) GetPMF(splitPoints []C, inclusive bool) ([]float64, error) { if s.IsEmpty() { return nil, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return nil, err } return s.sortedView.GetPMF(splitPoints, inclusive) } // GetCDF returns an approximation to the Cumulative Distribution Function (CDF) of the input stream // as a monotonically increasing array of double ranks (or cumulative probabilities) on the interval [0.0, 1.0], // given a set of splitPoints. // // The resulting approximations have a probabilistic guarantee that can be obtained from the // getNormalizedRankError(false) function. // // - splitPoints an array of <i>m</i> unique, monotonically increasing items // (of the same type as the input items) // that divide the item input domain into <i>m+1</i> overlapping intervals. // // The start of each interval is below the lowest item retained by the sketch // corresponding to a zero rank or zero probability, and the end of the interval // is the rank or cumulative probability corresponding to the split point. // // The (m+1)th interval represents 100% of the distribution represented by the sketch // and consistent with the definition of a cumulative probability distribution, thus the (m+1)th // rank or probability in the returned array is always 1.0. // // If a split point exactly equals a retained item of the sketch and the search criterion is: // // - INCLUSIVE, the resulting cumulative probability will include that item. // - EXCLUSIVE, the resulting cumulative probability will not include the weight of that split point. // // It is not recommended to include either the minimum or maximum items of the input stream. func (s *ItemsSketch[C]) GetCDF(splitPoints []C, inclusive bool) ([]float64, error) { if s.IsEmpty() { return nil, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return nil, err } return s.sortedView.GetCDF(splitPoints, inclusive) } // GetNormalizedRankError return the approximate rank error of this sketch normalized as a fraction between zero and one. // The epsilon returned is a best fit to 99 percent confidence empirically measured max error // in thousands of trials. // = pmf if true, returns the "double-sided" normalized rank error for the getPMF() function. // Otherwise, it is the "single-sided" normalized rank error for all the other queries. // @return if pmf is true, returns the "double-sided" normalized rank error for the getPMF() function. // Otherwise, it is the "single-sided" normalized rank error for all the other queries. func (s *ItemsSketch[C]) GetNormalizedRankError(pmf bool) float64 { return getNormalizedRankError(s.minK, pmf) } // GetPartitionBoundaries returns an instance of ItemsSketchPartitionBoundaries // which provides sufficient information for the user to create the given number of equally sized partitions, // where "equally sized" refers to an approximately equal number of items per partition. // // - numEquallySized an integer that specifies the number of equally sized partitions between getMinItem() and // getMaxItem(). // This must be a positive integer greater than zero. // // A 1 will return: minItem, maxItem. // A 2 will return: minItem, median quantile, maxItem. // Etc. // // - searchCrit // If INCLUSIVE, all the returned quantiles are the upper boundaries of the equally sized partitions // except for the lowest returned quantile, which is the lowest boundary of the lowest ranked partition. // If EXCLUSIVE, all the returned quantiles are the lower boundaries of the equally sized partitions // except for the highest returned quantile, which is the upper boundary of the highest ranked partition. func (s *ItemsSketch[C]) GetPartitionBoundaries(numEquallySized int, inclusive bool) (*ItemsSketchPartitionBoundaries[C], error) { if s.IsEmpty() { return nil, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return nil, err } return s.sortedView.GetPartitionBoundaries(numEquallySized, inclusive) } // GetSortedView return the sorted view of this sketch. func (s *ItemsSketch[C]) GetSortedView() (*ItemsSketchSortedView[C], error) { if s.IsEmpty() { return nil, fmt.Errorf("operation is undefined for an empty sketch") } err := s.setupSortedView() if err != nil { return nil, err } return s.sortedView, nil } // Update this sketch with the given item. func (s *ItemsSketch[C]) Update(item C) { s.updateItem(item, s.compareFn) s.sortedView = nil } // Merge the given sketch into this sketch. func (s *ItemsSketch[C]) Merge(other *ItemsSketch[C]) { if other.IsEmpty() { return } s.mergeItemsSketch(other) s.sortedView = nil } // Reset this sketch to the empty state. func (s *ItemsSketch[C]) Reset() { s.n = 0 s.isLevelZeroSorted = false s.numLevels = 1 s.levels = []uint32{uint32(s.k), uint32(s.k)} s.minItem = nil s.maxItem = nil s.items = make([]C, s.k) s.sortedView = nil } // ToSlice returns the serialized byte array of this sketch. func (s *ItemsSketch[C]) ToSlice() ([]byte, error) { if s.serde == nil { return nil, fmt.Errorf("no SerDe provided") } srcN := s.n var tgtStructure = _COMPACT_FULL if srcN == 0 { tgtStructure = _COMPACT_EMPTY } else if srcN == 1 { tgtStructure = _COMPACT_SINGLE } totalBytes, err := s.currentSerializedSizeBytes() if err != nil { return nil, err } bytesOut := make([]byte, totalBytes) //ints 0,1 preInts := byte(tgtStructure.getPreInts()) serVer := byte(tgtStructure.getSerVer()) famId := byte(internal.FamilyEnum.Kll.Id) flags := byte(0) if s.IsEmpty() { flags |= _EMPTY_BIT_MASK } if s.isLevelZeroSorted { flags |= _LEVEL_ZERO_SORTED_BIT_MASK } if s.n == 1 { flags |= _SINGLE_ITEM_BIT_MASK } k := uint16(s.k) m := uint8(s.m) bytesOut[0] = preInts bytesOut[1] = serVer bytesOut[2] = famId bytesOut[3] = flags binary.LittleEndian.PutUint16(bytesOut[4:6], k) bytesOut[6] = m if tgtStructure == _COMPACT_EMPTY { return bytesOut, nil } if tgtStructure == _COMPACT_SINGLE { siByteArr, err := s.getSingleItemByteArr() if err != nil { return nil, err } copy(bytesOut[_DATA_START_ADR_SINGLE_ITEM:], siByteArr) return bytesOut, nil } // Tgt is either COMPACT_FULL or UPDATABLE //ints 2,3 n := s.n //ints 4 minK := uint16(s.minK) numLevels := uint8(s.numLevels) //end of full preamble lvlsArr := s.getLevelsArray() minMaxByteArr := s.getMinMaxByteArr() itemsByteArr := s.getRetainedItemsByteArr() binary.LittleEndian.PutUint64(bytesOut[8:16], n) binary.LittleEndian.PutUint16(bytesOut[16:18], minK) bytesOut[18] = numLevels for i := uint8(0); i < numLevels; i++ { binary.LittleEndian.PutUint32(bytesOut[_DATA_START_ADR+i*4:], lvlsArr[i]) } copy(bytesOut[_DATA_START_ADR+(numLevels*4):], minMaxByteArr) copy(bytesOut[_DATA_START_ADR+int(numLevels*4)+len(minMaxByteArr):], itemsByteArr) return bytesOut, nil } // GetSerializedSizeBytes Returns the current number of bytes this Sketch would require if serialized in compact form. func (s *ItemsSketch[C]) GetSerializedSizeBytes() (int, error) { if s.serde == nil { return 0, fmt.Errorf("no SerDe provided") } return s.currentSerializedSizeBytes() } // GetIterator returns the iterator for this sketch, which is not sorted. func (s *ItemsSketch[C]) GetIterator() *ItemsSketchIterator[C] { return newItemsSketchIterator[C]( s.GetTotalItemsArray(), s.getLevelsArray(), s.getNumLevels(), ) } // // Private methods // func (s *ItemsSketch[C]) currentSerializedSizeBytes() (int, error) { srcN := s.n var tgtStructure = _COMPACT_FULL if srcN == 0 { tgtStructure = _COMPACT_EMPTY } else if srcN == 1 { tgtStructure = _COMPACT_SINGLE } totalBytes := 0 if tgtStructure == _COMPACT_EMPTY { totalBytes = _N_LONG_ADR } else if tgtStructure == _COMPACT_SINGLE { v, err := s.getSingleItemSizeBytes() if err != nil { return 0, err } totalBytes = _DATA_START_ADR_SINGLE_ITEM + v } else if tgtStructure == _COMPACT_FULL { totalBytes = _DATA_START_ADR + s.getLevelsArrSizeBytes(tgtStructure) + s.getMinMaxSizeBytes() + s.getRetainedItemsSizeBytes() } else { //structure = UPDATABLE return 0, fmt.Errorf("updatable serialization not implemented") } return totalBytes, nil } func (s *ItemsSketch[C]) getNumLevels() int { return len(s.levels) - 1 } func (s *ItemsSketch[C]) getLevelsArray() []uint32 { levels := make([]uint32, len(s.levels)) copy(levels, s.levels) return levels } func (s *ItemsSketch[C]) getLevelsArrSizeBytes(structure sketchStructure) int { if structure == _UPDATABLE { return len(s.levels) * 4 // * Integer.BYTES } else if structure == _COMPACT_FULL { return (len(s.levels) - 1) * 4 // // * Integer.BYTES } else { return 0 } } func (s *ItemsSketch[C]) getMinMaxSizeBytes() int { return s.serde.SizeOf(*s.minItem) + s.serde.SizeOf(*s.maxItem) } func (s *ItemsSketch[C]) getMinMaxByteArr() []byte { minBytes := s.serde.SerializeOneToSlice(*s.minItem) maxBytes := s.serde.SerializeOneToSlice(*s.maxItem) minMaxBytes := make([]byte, len(minBytes)+len(maxBytes)) copy(minMaxBytes, minBytes) copy(minMaxBytes[len(minBytes):], maxBytes) return minMaxBytes } func (s *ItemsSketch[C]) getSingleItemSizeBytes() (int, error) { v, err := s.getSingleItem() if err != nil { return 0, err } return s.serde.SizeOf(v), nil } func (s *ItemsSketch[C]) getSingleItemByteArr() ([]byte, error) { v, err := s.getSingleItem() if err != nil { return nil, err } return s.serde.SerializeOneToSlice(v), nil } func (s *ItemsSketch[C]) getSingleItem() (C, error) { if s.n != 1 { return *new(C), fmt.Errorf("sketch must have exactly one item") } return s.items[s.k-1], nil } func (s *ItemsSketch[C]) getRetainedItemsArray() []C { numRet := s.GetNumRetained() outArr := make([]C, numRet) copy(outArr, s.items[s.levels[0]:]) return outArr } func (s *ItemsSketch[C]) getRetainedItemsByteArr() []byte { retArr := s.getRetainedItemsArray() return s.serde.SerializeManyToSlice(retArr) } func (s *ItemsSketch[C]) getRetainedItemsSizeBytes() int { return len(s.getRetainedItemsByteArr()) } func (s *ItemsSketch[C]) setupSortedView() error { if s.sortedView == nil { sView, err := newItemsSketchSortedView[C](s) if err != nil { return err } s.sortedView = sView } return nil } func (s *ItemsSketch[C]) updateItem(item C, compareFn common.CompareFn[C]) { if internal.IsNil(item) { return } if s.IsEmpty() { s.minItem = &item s.maxItem = &item } else { if compareFn(item, *s.minItem) { s.minItem = &item } if compareFn(*s.maxItem, item) { s.maxItem = &item } } level0space := s.levels[0] if level0space == 0 { s.compressWhileUpdatingSketch() level0space = s.levels[0] } s.n++ s.isLevelZeroSorted = false nextPos := level0space - 1 s.levels[0] = nextPos s.items[nextPos] = item } func (s *ItemsSketch[C]) mergeItemsSketch(other *ItemsSketch[C]) { if other.IsEmpty() { return } // capture my key mutable fields before doing any merging myEmpty := s.IsEmpty() var myMin, myMax C var err error if !myEmpty { myMin, err = s.GetMinItem() if err != nil { panic(err) } myMax, err = s.GetMaxItem() if err != nil { panic(err) } } myMinK := s.minK finalN := s.n + other.n // buffers that are referenced multiple times otherNumLevels := other.numLevels otherLevelsArr := other.levels var otherItemsArr []C // MERGE: update this sketch with level0 items from the other sketch otherItemsArr = other.GetTotalItemsArray() for i := otherLevelsArr[0]; i < otherLevelsArr[1]; i++ { s.updateItem(otherItemsArr[i], s.compareFn) } // After the level 0 update, we capture the intermediate state of levels and items arrays... myCurNumLevels := s.numLevels myCurLevelsArr := s.levels myCurItemsArr := s.GetTotalItemsArray() // then rename them and initialize in case there are no higher levels myNewNumLevels := myCurNumLevels myNewLevelsArr := myCurLevelsArr myNewItemsArr := myCurItemsArr //merge higher levels if they exist if otherNumLevels > 1 { tmpSpaceNeeded := s.GetNumRetained() + getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr) workbuf := make([]C, tmpSpaceNeeded) ub := ubOnNumLevels(finalN) worklevels := make([]uint32, ub+2) // ub+1 does not work outlevels := make([]uint32, ub+2) provisionalNumLevels := max(myCurNumLevels, otherNumLevels) populateItemWorkArrays(workbuf, worklevels, provisionalNumLevels, myCurNumLevels, myCurLevelsArr, myCurItemsArr, otherNumLevels, otherLevelsArr, otherItemsArr, s.compareFn) // notice that workbuf is being used as both the input and output result := generalItemsCompress(s.k, s.m, provisionalNumLevels, workbuf, worklevels, workbuf, outlevels, s.isLevelZeroSorted, s.compareFn, s.deterministicOffsetForTest) targetItemCount := result[1] //was finalCapacity. Max size given k, m, numLevels curItemCount := result[2] //was finalPop // now we need to finalize the results for mySketch //THE NEW NUM LEVELS myNewNumLevels = uint8(result[0]) // THE NEW ITEMS ARRAY if int(targetItemCount) == len(myCurItemsArr) { myNewItemsArr = myCurItemsArr } else { myNewItemsArr = make([]C, targetItemCount) } freeSpaceAtBottom := targetItemCount - curItemCount //shift the new items array create space at bottom for i := uint32(0); i < uint32(curItemCount); i++ { myNewItemsArr[uint32(freeSpaceAtBottom)+i] = workbuf[outlevels[0]+i] } theShift := uint32(freeSpaceAtBottom) - outlevels[0] //calculate the new levels array length var finalLevelsArrLen uint32 if uint32(len(myCurLevelsArr)) < uint32(myNewNumLevels+1) { finalLevelsArrLen = uint32(myNewNumLevels + 1) } else { finalLevelsArrLen = uint32(len(myCurLevelsArr)) } //THE NEW LEVELS ARRAY myNewLevelsArr = make([]uint32, finalLevelsArrLen) for lvl := uint8(0); lvl < myNewNumLevels+1; lvl++ { // includes the "extra" index myNewLevelsArr[lvl] = outlevels[lvl] + theShift } //MEMORY SPACE MANAGEMENT //not used } // Update Preamble: s.n = finalN if other.IsEstimationMode() { //otherwise the merge brings over exact items. s.minK = min(myMinK, other.minK) } // Update numLevels, levelsArray, items s.numLevels = myNewNumLevels s.levels = myNewLevelsArr s.items = myNewItemsArr // Update min, max items if myEmpty { s.minItem = other.minItem s.maxItem = other.maxItem } else { if s.compareFn(myMin, *other.minItem) { s.minItem = &myMin } else { s.minItem = other.minItem } if s.compareFn(*other.maxItem, myMax) { s.maxItem = &myMax } else { s.maxItem = other.maxItem } } } func (s *ItemsSketch[C]) compressWhileUpdatingSketch() { level := findLevelToCompact(s.k, s.m, s.numLevels, s.levels) if level == s.numLevels-1 { //The level to compact is the top level, thus we need to add a level. //Be aware that this operation grows the items array, //shifts the items data and the level boundaries of the data, //and grows the levels array and increments numLevels_. s.addEmptyTopLevelToCompletelyFullSketch() } myLevelsArr := s.levels rawBeg := myLevelsArr[level] rawEnd := myLevelsArr[level+1] // +2 is OK because we already added a new top level if necessary popAbove := myLevelsArr[level+2] - rawEnd rawPop := rawEnd - rawBeg oddPop := rawPop%2 == 1 adjBeg := rawBeg if oddPop { adjBeg++ } adjPop := rawPop if oddPop { adjPop-- } halfAdjPop := adjPop / 2 //the following is specific to generic Items myItemsArr := s.GetTotalItemsArray() if level == 0 { // level zero might not be sorted, so we must sort it if we wish to compact it tmpSlice := myItemsArr[adjBeg : adjBeg+adjPop] sort.Slice(tmpSlice, func(a, b int) bool { return s.compareFn(tmpSlice[a], tmpSlice[b]) }) } if popAbove == 0 { randomlyHalveUpItems(myItemsArr, adjBeg, adjPop, s.deterministicOffsetForTest) } else { randomlyHalveDownItems(myItemsArr, adjBeg, adjPop, s.deterministicOffsetForTest) mergeSortedItemsArrays( myItemsArr, adjBeg, halfAdjPop, myItemsArr, rawEnd, popAbove, myItemsArr, adjBeg+halfAdjPop, s.compareFn) } newIndex := myLevelsArr[level+1] - halfAdjPop // adjust boundaries of the level above s.levels[level+1] = newIndex if oddPop { s.levels[level] = myLevelsArr[level+1] - 1 // the current level now contains one item myItemsArr[myLevelsArr[level]] = myItemsArr[rawBeg] // namely this leftover guy } else { s.levels[level] = myLevelsArr[level+1] // the current level is now empty } if level > 0 { amount := rawBeg - myLevelsArr[0] // adjust boundary for i := amount; i > 0; i-- { // Start from the end as we are shifting to the right, // failing to do so will corrupt the items array. tgtInx := myLevelsArr[0] + halfAdjPop + i - 1 stcInx := myLevelsArr[0] + i - 1 myItemsArr[tgtInx] = myItemsArr[stcInx] } } for lvl := uint8(0); lvl < level; lvl++ { newIndex = myLevelsArr[lvl] + halfAdjPop //adjust boundary s.levels[lvl] = newIndex } s.items = myItemsArr } func (s *ItemsSketch[C]) addEmptyTopLevelToCompletelyFullSketch() { myCurLevelsArr := s.getLevelsArray() myCurNumLevels := s.numLevels myCurTotalItemsCapacity := myCurLevelsArr[myCurNumLevels] myCurItemsArr := s.GetTotalItemsArray() minItem := s.minItem maxItem := s.maxItem deltaItemsCap := levelCapacity(s.k, myCurNumLevels+1, 0, s.m) myNewTotalItemsCapacity := myCurTotalItemsCapacity + deltaItemsCap // Check if growing the levels arr if required. // Note that merging MIGHT over-grow levels_, in which case we might not have to grow it growLevelsArr := len(myCurLevelsArr) < int(myCurNumLevels+2) var ( myNewLevelsArr []uint32 myNewNumLevels uint8 ) //myNewLevelsArr := make([]uint32, myCurNumLevels+2) // GROW LEVELS ARRAY if growLevelsArr { //grow levels arr by one and copy the old data to the new array, extra space at the top. myNewLevelsArr = make([]uint32, myCurNumLevels+2) copy(myNewLevelsArr, myCurLevelsArr) myNewNumLevels = myCurNumLevels + 1 s.numLevels++ //increment for off-heap } else { myNewLevelsArr = myCurLevelsArr myNewNumLevels = myCurNumLevels } // This loop updates all level indices EXCLUDING the "extra" index at the top for level := uint8(0); level <= myNewNumLevels-1; level++ { myNewLevelsArr[level] += deltaItemsCap } myNewLevelsArr[myNewNumLevels] = myNewTotalItemsCapacity // initialize the new "extra" index at the top // GROW items ARRAY myNewItemsArr := make([]C, myNewTotalItemsCapacity) for i := uint32(0); i < myCurTotalItemsCapacity; i++ { myNewItemsArr[i+deltaItemsCap] = myCurItemsArr[i] } // update our sketch with new expanded spaces s.numLevels = myNewNumLevels s.levels = myNewLevelsArr s.minItem = minItem s.maxItem = maxItem s.items = myNewItemsArr } func findLevelToCompact(k uint16, m uint8, numLevels uint8, levels []uint32) uint8 { level := uint8(0) for { pop := levels[level+1] - levels[level] capacity := levelCapacity(k, numLevels, level, m) if pop >= capacity { return level } level++ } } func computeTotalItemCapacity(k uint16, m uint8, numLevels uint8) uint32 { var total uint32 = 0 for level := uint8(0); level < numLevels; level++ { total += levelCapacity(k, numLevels, level, m) } return total } func levelCapacity(k uint16, numLevels uint8, level uint8, m uint8) uint32 { depth := numLevels - level - 1 return max(uint32(m), intCapAux(k, depth)) } func intCapAux(k uint16, depth uint8) uint32 { if depth <= 30 { return intCapAuxAux(k, depth) } half := depth / 2 rest := depth - half tmp := intCapAuxAux(k, half) return intCapAuxAux(uint16(tmp), rest) } func intCapAuxAux(k uint16, depth uint8) uint32 { twok := uint64(k << 1) // for rounding at the end, pre-multiply by 2 here, divide by 2 during rounding. tmp := (twok << depth) / powersOfThree[depth] //2k* (2/3)^depth. 2k also keeps the fraction larger. result := (tmp + 1) >> 1 // (tmp + 1)/2. If odd, round up. This guarantees an integer. if result <= uint64(k) { return uint32(result) } return uint32(k) } func randomlyHalveUpItems[C comparable](buf []C, start uint32, length uint32, deterministicOffsetForTest bool) { halfLength := length / 2 offset := rand.Intn(2) if deterministicOffsetForTest { offset = deterministicOffset() } j := (start + length) - 1 - uint32(offset) for i := (start + length) - 1; i >= (start + halfLength); i-- { buf[i] = buf[j] j -= 2 } } func randomlyHalveDownItems[C comparable](buf []C, start uint32, length uint32, deterministicOffsetForTest bool) { halfLength := length / 2 offset := rand.Intn(2) if deterministicOffsetForTest { offset = deterministicOffset() } j := start + uint32(offset) for i := start; i < (start + halfLength); i++ { buf[i] = buf[j] j += 2 } } func mergeSortedItemsArrays[C comparable](bufA []C, startA uint32, lenA uint32, bufB []C, startB uint32, lenB uint32, bufC []C, startC uint32, compareFn common.CompareFn[C]) { lenC := lenA + lenB limA := startA + lenA limB := startB + lenB limC := startC + lenC a := startA b := startB for c := startC; c < limC; c++ { if a == limA { bufC[c] = bufB[b] b++ } else if b == limB { bufC[c] = bufA[a] a++ } else if compareFn(bufA[a], bufB[b]) { bufC[c] = bufA[a] a++ } else { bufC[c] = bufB[b] b++ } } } func populateItemWorkArrays[C comparable](workbuf []C, worklevels []uint32, provisionalNumLevels uint8, myCurNumLevels uint8, myCurLevelsArr []uint32, myCurItemsArr []C, otherNumLevels uint8, otherLevelsArr []uint32, otherItemsArr []C, compareFn common.CompareFn[C]) { worklevels[0] = 0 // Note: the level zero data from "other" was already inserted into "self" selfPopZero := currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr) for i := uint32(0); i < selfPopZero; i++ { workbuf[worklevels[0]+i] = myCurItemsArr[myCurLevelsArr[0]+i] } worklevels[1] = worklevels[0] + selfPopZero for lvl := uint8(1); lvl < provisionalNumLevels; lvl++ { selfPop := currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr) otherPop := currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr) worklevels[lvl+1] = worklevels[lvl] + selfPop + otherPop if selfPop > 0 && otherPop == 0 { for i := uint32(0); i < selfPop; i++ { workbuf[worklevels[lvl]+i] = myCurItemsArr[myCurLevelsArr[lvl]+i] } } else if selfPop == 0 && otherPop > 0 { for i := uint32(0); i < otherPop; i++ { workbuf[worklevels[lvl]+i] = otherItemsArr[otherLevelsArr[lvl]+i] } } else if selfPop > 0 && otherPop > 0 { mergeSortedItemsArrays( myCurItemsArr, myCurLevelsArr[lvl], selfPop, otherItemsArr, otherLevelsArr[lvl], otherPop, workbuf, worklevels[lvl], compareFn) } } } func generalItemsCompress[C comparable]( k uint16, m uint8, numLevelsIn uint8, inBuf []C, inLevels []uint32, outBuf []C, outLevels []uint32, isLevelZeroSorted bool, compareFn common.CompareFn[C], deterministicOffsetForTest bool, ) []uint32 { numLevels := numLevelsIn currentItemCount := inLevels[numLevels] - inLevels[0] // decreases with each compaction targetItemCount := computeTotalItemCapacity(k, m, numLevels) // increases if we add levels doneYet := false outLevels[0] = 0 curLevel := -1 for !doneYet { curLevel++ // start out at level 0 // If we are at the current top level, add an empty level above it for convenience, // but do not increment numLevels until later if curLevel == (int(numLevels) - 1) { inLevels[curLevel+2] = inLevels[curLevel+1] } rawBeg := inLevels[curLevel] rawLim := inLevels[curLevel+1] rawPop := rawLim - rawBeg if (uint32(currentItemCount) < targetItemCount) || (rawPop < levelCapacity(k, numLevels, uint8(curLevel), m)) { for i := uint32(0); i < rawPop; i++ { outBuf[outLevels[curLevel]+i] = inBuf[rawBeg+i] } outLevels[curLevel+1] = outLevels[curLevel] + rawPop } else { // The sketch is too full AND this level is too full, so we compact it // Note: this can add a level and thus change the sketch's capacity popAbove := inLevels[curLevel+2] - rawLim oddPop := rawPop%2 == 1 adjBeg := rawBeg if oddPop { adjBeg++ } adjPop := rawPop if oddPop { adjPop-- } halfAdjPop := adjPop / 2 if oddPop { outBuf[outLevels[curLevel]] = inBuf[rawBeg] outLevels[curLevel+1] = outLevels[curLevel] + 1 } else { outLevels[curLevel+1] = outLevels[curLevel] } // level zero might not be sorted, so we must sort it if we wish to compact it if (curLevel == 0) && !isLevelZeroSorted { tmpSlice := inBuf[adjBeg : adjBeg+adjPop] sort.Slice(tmpSlice, func(a, b int) bool { return compareFn(tmpSlice[a], tmpSlice[b]) }) } if popAbove == 0 { randomlyHalveUpItems(inBuf, adjBeg, adjPop, deterministicOffsetForTest) } else { randomlyHalveDownItems(inBuf, adjBeg, adjPop, deterministicOffsetForTest) mergeSortedItemsArrays( inBuf, adjBeg, halfAdjPop, inBuf, rawLim, popAbove, inBuf, adjBeg+halfAdjPop, compareFn) } // track the fact that we just eliminated some data currentItemCount -= halfAdjPop // Adjust the boundaries of the level above inLevels[curLevel+1] = inLevels[curLevel+1] - halfAdjPop // Increment numLevels if we just compacted the old top level // This creates some more capacity (the size of the new bottom level) if curLevel == (int(numLevels) - 1) { numLevels++ targetItemCount += levelCapacity(k, numLevels, 0, m) } } // end of code for compacting a level // determine whether we have processed all levels yet (including any new levels that we created) if curLevel == (int(numLevels) - 1) { doneYet = true } } // end of loop over levels return []uint32{uint32(numLevels), targetItemCount, currentItemCount} } func deterministicOffset() int { result := nextOffsetForTest nextOffsetForTest = 1 - nextOffsetForTest return result }