frequencies/items_sketch.go (345 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package frequencies is dedicated to streaming algorithms that enable estimation of the
// frequency of occurrence of items in a weighted multiset stream of items.
// If the frequency distribution of items is sufficiently skewed, these algorithms are very
// useful in identifying the "Heavy Hitters" that occurred most frequently in the stream.
// The accuracy of the estimation of the frequency of an item has well understood error
// bounds that can be returned by the sketch.
//
// These algorithms are sometimes referred to as "TopN" algorithms.
package frequencies
import (
"encoding/binary"
"errors"
"fmt"
"github.com/apache/datasketches-go/common"
"github.com/apache/datasketches-go/internal"
"sort"
"strconv"
"strings"
)
type ItemsSketch[C comparable] struct {
// Log2 Maximum length of the arrays internal to the hash map supported by the data
// structure.
lgMaxMapSize int
// The current number of counters supported by the hash map.
curMapCap int //the threshold to purge
// Tracks the total of decremented counts.
offset int64
// The sum of all frequencies of the stream so far.
streamWeight int64
// The maximum number of samples used to compute approximate median of counters when doing
// decrement
sampleSize int
// Hash map mapping stored items to approximate counts
hashMap *reversePurgeItemHashMap[C]
}
// NewFrequencyItemsSketch constructs a new ItemsSketch with the given parameters.
// this internal constructor is used when deserializing the sketch.
//
// - lgMaxMapSize, log2 of the physical size of the internal hash map managed by this
// sketch. The maximum capacity of this internal hash map is 0.75 times 2^lgMaxMapSize.
// Both the ultimate accuracy and size of this sketch are functions of lgMaxMapSize.
// - lgCurMapSize, log2 of the starting (current) physical size of the internal hashFn
// map managed by this sketch.
func NewFrequencyItemsSketch[C comparable](lgMaxMapSize int, lgCurMapSize int, hasher common.ItemSketchHasher[C], serde common.ItemSketchSerde[C]) (*ItemsSketch[C], error) {
lgMaxMapSz := max(lgMaxMapSize, _LG_MIN_MAP_SIZE)
lgCurMapSz := max(lgCurMapSize, _LG_MIN_MAP_SIZE)
hashMap, err := newReversePurgeItemHashMap[C](1<<lgCurMapSz, hasher, serde)
if err != nil {
return nil, err
}
curMapCap := hashMap.getCapacity()
maxMapCap := int(float64(uint64(1)<<lgMaxMapSize) * reversePurgeItemHashMapLoadFactor)
offset := int64(0)
sampleSize := min(_SAMPLE_SIZE, maxMapCap)
return &ItemsSketch[C]{
lgMaxMapSize: lgMaxMapSz,
curMapCap: curMapCap,
offset: offset,
sampleSize: sampleSize,
hashMap: hashMap,
}, nil
}
// NewFrequencyItemsSketchWithMaxMapSize constructs a new ItemsSketch with the given maxMapSize and the default
// initialMapSize (8).
//
// - maxMapSize, Determines the physical size of the internal hash map managed by this
// sketch and must be a power of 2. The maximum capacity of this internal hash map is
// 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch are
// functions of maxMapSize.
func NewFrequencyItemsSketchWithMaxMapSize[C comparable](maxMapSize int, hasher common.ItemSketchHasher[C], serde common.ItemSketchSerde[C]) (*ItemsSketch[C], error) {
maxMapSz, err := internal.ExactLog2(maxMapSize)
if err != nil {
return nil, err
}
return NewFrequencyItemsSketch[C](maxMapSz, _LG_MIN_MAP_SIZE, hasher, serde)
}
// NewFrequencyItemsSketchFromSlice constructs a new ItemsSketch with the given maxMapSize and the
// default initialMapSize (8).
//
// maxMapSize determines the physical size of the internal hashmap managed by this
// sketch and must be a power of 2. The maximum capacity of this internal hash map is
// 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch are a
// function of maxMapSize.
func NewFrequencyItemsSketchFromSlice[C comparable](slc []byte, hasher common.ItemSketchHasher[C], serde common.ItemSketchSerde[C]) (*ItemsSketch[C], error) {
if serde == nil {
return nil, errors.New("no SerDe provided")
}
pre0, err := checkPreambleSize(slc) //make sure preamble will fit
maxPreLongs := internal.FamilyEnum.Frequency.MaxPreLongs
preLongs := extractPreLongs(pre0) //Byte 0
serVer := extractSerVer(pre0) //Byte 1
familyID := extractFamilyID(pre0) //Byte 2
lgMaxMapSize := extractLgMaxMapSize(pre0) //Byte 3
lgCurMapSize := extractLgCurMapSize(pre0) //Byte 4
empty := (extractFlags(pre0) & _EMPTY_FLAG_MASK) != 0 //Byte 5
// Checks
preLongsEq1 := (preLongs == 1) //Byte 0
preLongsEqMax := (preLongs == maxPreLongs)
if !preLongsEq1 && !preLongsEqMax {
return nil, fmt.Errorf("possible corruption: preLongs must be 1 or %d: %d", maxPreLongs, preLongs)
}
if serVer != _SER_VER { //Byte 1
return nil, fmt.Errorf("possible corruption: ser ver must be %d: %d", _SER_VER, serVer)
}
actFamID := internal.FamilyEnum.Frequency.Id //Byte 2
if familyID != actFamID {
return nil, fmt.Errorf("possible corruption: familyID must be %d: %d", actFamID, familyID)
}
if empty && !preLongsEq1 { //Byte 5 and Byte 0
return nil, fmt.Errorf("(preLongs == 1) ^ empty == true")
}
if empty {
return NewFrequencyItemsSketchWithMaxMapSize[C](1<<_LG_MIN_MAP_SIZE, hasher, serde)
}
// Get full preamble
preArr := make([]int64, preLongs)
for j := 0; j < preLongs; j++ {
preArr[j] = int64(binary.LittleEndian.Uint64(slc[j<<3:]))
}
fis, err := NewFrequencyItemsSketch[C](lgMaxMapSize, lgCurMapSize, hasher, serde)
if err != nil {
return nil, err
}
fis.streamWeight = 0 // update after
fis.offset = preArr[3]
preBytes := preLongs << 3
activeItems := extractActiveItems(preArr[1])
// Get countArray
countArray := make([]int64, activeItems)
reqBytes := preBytes + activeItems*8 // count Arr only
if len(slc) < reqBytes {
return nil, fmt.Errorf("possible Corruption: Insufficient bytes in array: %d, %d", len(slc), reqBytes)
}
for j := 0; j < activeItems; j++ {
countArray[j] = int64(binary.LittleEndian.Uint64(slc[preBytes+j<<3:]))
}
// Get itemArray
itemsOffset := preBytes + (8 * activeItems)
itemArray, err := serde.DeserializeManyFromSlice(slc[itemsOffset:], 0, activeItems)
if err != nil {
return nil, err
}
// update the sketch
for j := 0; j < activeItems; j++ {
err := fis.UpdateMany(itemArray[j], countArray[j])
if err != nil {
return nil, err
}
}
fis.streamWeight = preArr[2] // override streamWeight due to updating
return fis, nil
}
// GetAprioriErrorFrequencyItemsSketch returns the estimated a priori error given the maxMapSize for the sketch and the
// estimatedTotalStreamWeight.
//
// maxMapSize is the planned map size to be used when constructing this sketch.
// estimatedTotalStreamWeight is the estimated total stream weight.
func GetAprioriErrorFrequencyItemsSketch(maxMapSize int, estimatedTotalStreamWeight int64) (float64, error) {
epsilon, err := GetEpsilonLongsSketch(maxMapSize)
if err != nil {
return 0, err
}
return epsilon * float64(estimatedTotalStreamWeight), nil
}
// GetEpsilonFrequencyItemsSketch returns epsilon used to compute a priori error.
// This is just the value 3.5 / maxMapSize.
//
// maxMapSize is the planned map size to be used when constructing this sketch.
func GetEpsilonFrequencyItemsSketch(maxMapSize int) (float64, error) {
if !internal.IsPowerOf2(maxMapSize) {
return 0, errors.New("maxMapSize is not a power of 2")
}
return 3.5 / float64(maxMapSize), nil
}
// GetCurrentMapCapacity returns the current number of counters the sketch is configured to support.
func (i *ItemsSketch[C]) GetCurrentMapCapacity() int {
return i.curMapCap
}
// GetEstimate gets the estimate of the frequency of the given item.
// Note: The true frequency of an item would be the sum of the counts as a result of the
// two update functions.
//
// item is the given item
//
// return the estimate of the frequency of the given item
func (i *ItemsSketch[C]) GetEstimate(item C) (int64, error) {
// If item is tracked:
// Estimate = itemCount + offset; Otherwise it is 0.
v, err := i.hashMap.get(item)
if v > 0 {
return v + i.offset, err
}
return 0, err
}
// GetLowerBound gets the guaranteed lower bound frequency of the given item, which can never be
// negative.
//
// - item, the given item.
func (i *ItemsSketch[C]) GetLowerBound(item C) (int64, error) {
return i.hashMap.get(item)
}
// GetUpperBound gets the guaranteed upper bound frequency of the given item.
//
// - item, the given item.
func (i *ItemsSketch[C]) GetUpperBound(item C) (int64, error) {
// UB = itemCount + offset
v, err := i.hashMap.get(item)
return v + i.offset, err
}
// GetFrequentItemsWithThreshold returns an array of RowItem that include frequent items, estimates, upper and
// lower bounds given a threshold and an ErrorCondition. If the threshold is lower than
// getMaximumError(), then getMaximumError() will be used instead.
//
// The method first examines all active items in the sketch (items that have a counter).
//
// If errorType = NO_FALSE_NEGATIVES, this will include an item in the result list if
// GetUpperBound(item) > threshold. There will be no false negatives, i.e., no Type II error.
// There may be items in the set with true frequencies less than the threshold (false positives).
//
// If errorType = NO_FALSE_POSITIVES, this will include an item in the result list if
// GetLowerBound(item) > threshold. There will be no false positives, i.e., no Type I error.
// There may be items omitted from the set with true frequencies greater than the threshold
// (false negatives). This is a subset of the NO_FALSE_NEGATIVES case.
//
// threshold to include items in the result list
// errorType determines whether no false positives or no false negatives are desired.
// an array of frequent items
func (i *ItemsSketch[C]) GetFrequentItemsWithThreshold(threshold int64, errorType errorType) ([]*RowItem[C], error) {
finalThreshold := i.GetMaximumError()
if threshold > finalThreshold {
finalThreshold = threshold
}
return i.sortItems(finalThreshold, errorType)
}
// GetFrequentItems returns an array of Row that include frequent items, estimates, upper and
// lower bounds given an ErrorCondition and the default threshold.
// This is the same as GetFrequentItemsWithThreshold(getMaximumError(), errorType)
//
// errorType determines whether no false positives or no false negatives are desired.
func (i *ItemsSketch[C]) GetFrequentItems(errorType errorType) ([]*RowItem[C], error) {
return i.sortItems(i.GetMaximumError(), errorType)
}
// GetNumActiveItems returns the number of active items in the sketch.
func (i *ItemsSketch[C]) GetNumActiveItems() int {
return i.hashMap.numActive
}
// GetMaximumError return an upper bound on the maximum error of GetEstimate(item) for any item.
// This is equivalent to the maximum distance between the upper bound and the lower bound
// for any item.
func (i *ItemsSketch[C]) GetMaximumError() int64 {
return i.offset
}
// GetMaximumMapCapacity returns the maximum number of counters the sketch is configured to
// support.
func (i *ItemsSketch[C]) GetMaximumMapCapacity() int {
return int(float64(uint64(1<<i.lgMaxMapSize)) * reversePurgeItemHashMapLoadFactor)
}
// GetStreamLength returns the sum of the frequencies in the stream seen so far by the sketch.
func (i *ItemsSketch[C]) GetStreamLength() int64 {
return i.streamWeight
}
// IsEmpty returns true if this sketch is empty.
func (i *ItemsSketch[C]) IsEmpty() bool {
return i.GetNumActiveItems() == 0
}
// Update this sketch with an item and a frequency count of one.
//
// item for which the frequency should be increased.
func (i *ItemsSketch[C]) Update(item C) error {
return i.UpdateMany(item, 1)
}
// UpdateMany update this sketch with an item and a positive frequency count (or weight).
//
// Item for which the frequency should be increased. The item can be any long value
// and is only used by the sketch to determine uniqueness.
// count the amount by which the frequency of the item should be increased.
// A count of zero is a no-op, and a negative count will throw an exception.
func (i *ItemsSketch[C]) UpdateMany(item C, count int64) error {
if internal.IsNil(item) || count == 0 {
return nil
}
if count < 0 {
return fmt.Errorf("count may not be negative")
}
i.streamWeight += count
err := i.hashMap.adjustOrPutValue(item, count)
if err != nil {
return err
}
if i.GetNumActiveItems() > i.curMapCap { //over the threshold, we need to do something
if i.hashMap.lgLength < i.lgMaxMapSize { //below tgt size, we can grow
err := i.hashMap.resize(2 * len(i.hashMap.keys))
if err != nil {
return err
}
i.curMapCap = i.hashMap.getCapacity()
} else {
i.offset += i.hashMap.purge(i.sampleSize)
if i.GetNumActiveItems() > i.GetMaximumMapCapacity() {
return fmt.Errorf("purge did not reduce active items")
}
}
}
return nil
}
// Merge merges the other sketch into this one. The other sketch may be of a different size.
//
// other sketch of this class
//
// return a sketch whose estimates are within the guarantees of the largest error tolerance
// of the two merged sketches.
func (i *ItemsSketch[C]) Merge(other *ItemsSketch[C]) (*ItemsSketch[C], error) {
if other == nil || other.IsEmpty() {
return i, nil
}
streamLen := i.streamWeight + other.streamWeight //capture before merge
iter := other.hashMap.iterator()
for iter.next() {
err := i.UpdateMany(iter.getKey(), iter.getValue())
if err != nil {
return nil, err
}
}
i.offset += other.offset
i.streamWeight = streamLen //corrected streamWeight
return i, nil
}
// ToString returns a String representation of this sketch
func (i *ItemsSketch[C]) ToString() (string, error) {
var sb strings.Builder
//start the string with parameters of the sketch
serVer := _SER_VER //0
famID := internal.FamilyEnum.Frequency.Id
lgMaxMapSz := i.lgMaxMapSize
flags := 0
if i.hashMap.numActive == 0 {
flags = _EMPTY_FLAG_MASK
}
_, err := fmt.Fprintf(&sb, "%d,%d,%d,%d,%d,%d,", serVer, famID, lgMaxMapSz, flags, i.streamWeight, i.offset)
if err != nil {
return "", err
}
sb.WriteString(i.hashMap.serializeToString()) //numActive, curMaplen, key[i], value[i], ...
return sb.String(), nil
}
// ToSlice returns a slice representation of this sketch
func (i *ItemsSketch[C]) ToSlice() ([]byte, error) {
if i.hashMap.serde == nil {
return nil, errors.New("no SerDe provided")
}
preLongs := 0
outBytes := 0
empty := i.IsEmpty()
activeItems := i.GetNumActiveItems()
bytes := make([]byte, 0)
if empty {
preLongs = 1
outBytes = 8
} else {
preLongs = internal.FamilyEnum.Frequency.MaxPreLongs
bytes = i.hashMap.serde.SerializeManyToSlice(i.hashMap.getActiveKeys())
outBytes = ((preLongs + activeItems) << 3) + len(bytes)
}
outArr := make([]byte, outBytes)
pre0 := int64(0)
pre0 = insertPreLongs(int64(preLongs), pre0) //Byte 0
pre0 = insertSerVer(_SER_VER, pre0) //Byte 1
pre0 = insertFamilyID(int64(internal.FamilyEnum.Frequency.Id), pre0) //Byte 2
pre0 = insertLgMaxMapSize(int64(i.lgMaxMapSize), pre0) //Byte 3
pre0 = insertLgCurMapSize(int64(i.hashMap.lgLength), pre0) //Byte 4
if empty {
pre0 = insertFlags(_EMPTY_FLAG_MASK, pre0) //Byte 5
} else {
pre0 = insertFlags(0, pre0) //Byte 5
}
if empty {
binary.LittleEndian.PutUint64(outArr, uint64(pre0))
} else {
pre := int64(0)
preArr := make([]int64, preLongs)
preArr[0] = pre0
preArr[1] = insertActiveItems(int64(activeItems), pre)
preArr[2] = int64(i.streamWeight)
preArr[3] = int64(i.offset)
for j := 0; j < preLongs; j++ {
binary.LittleEndian.PutUint64(outArr[j<<3:], uint64(preArr[j]))
}
preBytes := preLongs << 3
for j := 0; j < activeItems; j++ {
binary.LittleEndian.PutUint64(outArr[preBytes+j<<3:], uint64(i.hashMap.getActiveValues()[j]))
}
copy(outArr[preBytes+(activeItems<<3):], bytes)
}
return outArr, nil
}
// Reset resets this sketch to a virgin state.
func (i *ItemsSketch[C]) Reset() error {
hashMap, err := newReversePurgeItemHashMap[C](1<<_LG_MIN_MAP_SIZE, i.hashMap.hasher, i.hashMap.serde)
if err != nil {
return err
}
i.hashMap = hashMap
i.curMapCap = hashMap.getCapacity()
i.offset = 0
i.streamWeight = 0
return nil
}
func (i *ItemsSketch[C]) String() string {
var sb strings.Builder
sb.WriteString("FrequentItemsSketch:")
sb.WriteString("\n")
sb.WriteString(" Stream Length : " + strconv.FormatInt(i.streamWeight, 10))
sb.WriteString("\n")
sb.WriteString(" Max Error Offset : " + strconv.FormatInt(i.offset, 10))
sb.WriteString("\n")
sb.WriteString(i.hashMap.String())
return sb.String()
}
func (i *ItemsSketch[C]) sortItems(threshold int64, errorType errorType) ([]*RowItem[C], error) {
rowList := make([]*RowItem[C], 0)
iter := i.hashMap.iterator()
if errorType == ErrorTypeEnum.NoFalseNegatives {
for iter.next() {
est, err := i.GetEstimate(iter.getKey())
if err != nil {
return nil, err
}
ub, err := i.GetUpperBound(iter.getKey())
if err != nil {
return nil, err
}
lb, err := i.GetLowerBound(iter.getKey())
if err != nil {
return nil, err
}
if ub >= threshold {
row := newRowItem[C](iter.getKey(), est, ub, lb)
rowList = append(rowList, row)
}
}
} else { //NO_FALSE_POSITIVES
for iter.next() {
est, err := i.GetEstimate(iter.getKey())
if err != nil {
return nil, err
}
ub, err := i.GetUpperBound(iter.getKey())
if err != nil {
return nil, err
}
lb, err := i.GetLowerBound(iter.getKey())
if err != nil {
return nil, err
}
if lb >= threshold {
row := newRowItem[C](iter.getKey(), est, ub, lb)
rowList = append(rowList, row)
}
}
}
sort.Slice(rowList, func(i, j int) bool {
return rowList[i].est > rowList[j].est
})
return rowList, nil
}