frequencies/longs_sketch.go (416 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package frequencies
import (
"encoding/binary"
"errors"
"fmt"
"math/bits"
"sort"
"strconv"
"strings"
"github.com/apache/datasketches-go/internal"
)
type LongsSketch struct {
// Log2 Maximum length of the arrays internal to the hash map supported by the data
// structure.
lgMaxMapSize int
// The current number of counters supported by the hash map.
curMapCap int //the threshold to purge
// Tracks the total of decremented counts.
offset int64
// The sum of all frequencies of the stream so far.
streamWeight int64
// The maximum number of samples used to compute approximate median of counters when doing
// decrement
sampleSize int
// Hash map mapping stored items to approximate counts
hashMap *reversePurgeLongHashMap
}
const (
strPreambleTokens = 6
)
// NewLongsSketch returns a new LongsSketch with the given lgMaxMapSize and lgCurMapSize.
//
// lgMaxMapSize is the log2 of the physical size of the internal hash map managed by this
// sketch. The maximum capacity of this internal hash map is 0.75 times 2^lgMaxMapSize.
// Both the ultimate accuracy and size of this sketch are a function of lgMaxMapSize.
//
// lgCurMapSize is the log2 of the starting (current) physical size of the internal hashFn
// map managed by this sketch.
func NewLongsSketch(lgMaxMapSize int, lgCurMapSize int) (*LongsSketch, error) {
//set initial size of hash map
lgMaxMapSize = max(lgMaxMapSize, _LG_MIN_MAP_SIZE)
lgCurMapSize = max(lgCurMapSize, _LG_MIN_MAP_SIZE)
hashMap, err := newReversePurgeLongHashMap(1 << lgCurMapSize)
if err != nil {
return nil, err
}
curMapCap := hashMap.getCapacity()
maxMapCap := int(float64(uint64(1<<lgMaxMapSize)) * reversePurgeLongHashMapLoadFactor)
offset := int64(0)
sampleSize := min(_SAMPLE_SIZE, maxMapCap)
return &LongsSketch{
lgMaxMapSize: lgMaxMapSize,
curMapCap: curMapCap,
offset: offset,
sampleSize: sampleSize,
hashMap: hashMap,
}, nil
}
// NewLongsSketchWithMaxMapSize constructs a new LongsSketch with the given maxMapSize and the
// default initialMapSize (8).
//
// maxMapSize determines the physical size of the internal hash map managed by this
// sketch and must be a power of 2. The maximum capacity of this internal hash map is
// 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch are a
// function of maxMapSize.
func NewLongsSketchWithMaxMapSize(maxMapSize int) (*LongsSketch, error) {
log2OfInt, err := internal.ExactLog2(maxMapSize)
if err != nil {
return nil, fmt.Errorf("maxMapSize, %e", err)
}
return NewLongsSketch(log2OfInt, _LG_MIN_MAP_SIZE)
}
// NewLongsSketchFromSlice returns a sketch instance of this class from the given slice,
// which must be a byte slice representation of this sketch class.
//
// slc is a byte slice representation of a sketch of this class.
func NewLongsSketchFromSlice(slc []byte) (*LongsSketch, error) {
pre0, err := checkPreambleSize(slc)
if err != nil {
return nil, err
}
maxPreLongs := internal.FamilyEnum.Frequency.MaxPreLongs
preLongs := extractPreLongs(pre0)
serVer := extractSerVer(pre0)
familyID := extractFamilyID(pre0)
lgMaxMapSize := extractLgMaxMapSize(pre0)
lgCurMapSize := extractLgCurMapSize(pre0)
empty := (extractFlags(pre0) & _EMPTY_FLAG_MASK) != 0
// Checks
preLongsEq1 := preLongs == 1
preLongsEqMax := preLongs == maxPreLongs
if !preLongsEq1 && !preLongsEqMax {
return nil, fmt.Errorf("possible Corruption: PreLongs must be 1 or %d: %d", maxPreLongs, preLongs)
}
if serVer != _SER_VER {
return nil, fmt.Errorf("possible Corruption: Ser Ver must be %d: %d", _SER_VER, serVer)
}
actFamID := internal.FamilyEnum.Frequency.Id
if familyID != actFamID {
return nil, fmt.Errorf("possible Corruption: FamilyID must be %d: %d", actFamID, familyID)
}
if empty && !preLongsEq1 {
return nil, fmt.Errorf("possible Corruption: Empty Flag set incorrectly: %t", preLongsEq1)
}
if empty {
return NewLongsSketch(lgMaxMapSize, _LG_MIN_MAP_SIZE)
}
// get full preamble
preArr := make([]int64, preLongs)
for i := 0; i < preLongs; i++ {
preArr[i] = int64(binary.LittleEndian.Uint64(slc[i<<3:]))
}
fls, err := NewLongsSketch(lgMaxMapSize, lgCurMapSize)
if err != nil {
return nil, err
}
fls.streamWeight = 0 //update after
fls.offset = preArr[3]
preBytes := preLongs << 3
activeItems := extractActiveItems(preArr[1])
// Get countArray
countArray := make([]int64, activeItems)
reqBytes := preBytes + 2*activeItems*8 //count Arr + Items Arr
if len(slc) < reqBytes {
return nil, fmt.Errorf("possible Corruption: Insufficient bytes in array: %d, %d", len(slc), reqBytes)
}
for i := 0; i < activeItems; i++ {
countArray[i] = int64(binary.LittleEndian.Uint64(slc[preBytes+(i<<3):]))
}
// Get itemArray
itemsOffset := preBytes + (8 * activeItems)
itemArray := make([]int64, activeItems)
for i := 0; i < activeItems; i++ {
itemArray[i] = int64(binary.LittleEndian.Uint64(slc[itemsOffset+(i<<3):]))
}
// UpdateMany the sketch
for i := 0; i < activeItems && err == nil; i++ {
err = fls.UpdateMany(itemArray[i], countArray[i])
}
if err != nil {
return nil, err
}
fls.streamWeight = preArr[2] //override streamWeight due to updating
return fls, nil
}
// NewLongsSketchFromString returns a sketch instance of this class from the given string,
// which must be a string representation of this sketch class.
//
// str is a string representation of a sketch of this class.
func NewLongsSketchFromString(str string) (*LongsSketch, error) {
if len(str) < 1 {
return nil, errors.New("string is empty")
}
// Remove trailing comma if present
// as this will cause a problem with the split
if str[len(str)-1] == ',' {
str = str[:len(str)-1]
}
tokens := strings.Split(str, ",")
if len(tokens) < (strPreambleTokens + 2) {
return nil, fmt.Errorf("string not long enough: %d", len(tokens))
}
serVe, err := strconv.ParseInt(tokens[0], 10, 32)
if err != nil {
return nil, err
}
famID, err := strconv.ParseInt(tokens[1], 10, 32)
if err != nil {
return nil, err
}
lgMax, err := strconv.ParseInt(tokens[2], 10, 32)
if err != nil {
return nil, err
}
flags, err := strconv.ParseInt(tokens[3], 10, 32)
if err != nil {
return nil, err
}
streamWt, err := strconv.ParseInt(tokens[4], 10, 64)
if err != nil {
return nil, err
}
offset, err := strconv.ParseInt(tokens[5], 10, 64)
if err != nil {
return nil, err
}
//should always get at least the next 2 from the map
numActive, err := strconv.ParseInt(tokens[6], 10, 32)
if err != nil {
return nil, err
}
lgCurOrigin, err := strconv.ParseUint(tokens[7], 10, 32)
if err != nil {
return nil, err
}
lgCur := bits.TrailingZeros64(lgCurOrigin)
//checks
if serVe != _SER_VER {
return nil, fmt.Errorf("possible Corruption: Bad SerVer: %d", serVe)
}
if famID != int64(internal.FamilyEnum.Frequency.Id) {
return nil, fmt.Errorf("possible Corruption: Bad Family: %d", famID)
}
empty := flags > 0
if !empty && (numActive == 0) {
return nil, fmt.Errorf("Possible Corruption: !Empty && NumActive=0; strLen: %d", numActive)
}
numTokens := int64(len(tokens))
if (2 * numActive) != (numTokens - strPreambleTokens - 2) {
return nil, fmt.Errorf("possible Corruption: Incorrect # of tokens: %d, numActive: %d", numTokens, numActive)
}
// if ((2 * numActive) != (numTokens - STR_PREAMBLE_TOKENS - 2)) {
sk, err := NewLongsSketch(int(lgMax), int(lgCur))
if err != nil {
return nil, err
}
sk.streamWeight = streamWt
sk.offset = offset
sk.hashMap, err = deserializeFromStringArray(tokens)
if err != nil {
return nil, err
}
return sk, nil
}
// GetAprioriErrorLongsSketch returns the estimated a priori error given the maxMapSize for the sketch and the
// estimatedTotalStreamWeight.
//
// maxMapSize is the planned map size to be used when constructing this sketch.
// estimatedTotalStreamWeight is the estimated total stream weight.
func GetAprioriErrorLongsSketch(maxMapSize int, estimatedTotalStreamWeight int64) (float64, error) {
epsilon, err := GetEpsilonLongsSketch(maxMapSize)
if err != nil {
return 0, err
}
return epsilon * float64(estimatedTotalStreamWeight), nil
}
// GetCurrentMapCapacity returns the current number of counters the sketch is configured to support.
func (s *LongsSketch) GetCurrentMapCapacity() int {
return s.curMapCap
}
// GetEpsilonLongsSketch returns epsilon used to compute a priori error.
// This is just the value 3.5 / maxMapSize.
//
// maxMapSize is the planned map size to be used when constructing this sketch.
func GetEpsilonLongsSketch(maxMapSize int) (float64, error) {
if !internal.IsPowerOf2(maxMapSize) {
return 0, errors.New("maxMapSize is not a power of 2")
}
return 3.5 / float64(maxMapSize), nil
}
// GetEstimate gets the estimate of the frequency of the given item.
// Note: The true frequency of an item would be the sum of the counts as a result of the
// two update functions.
//
// item is the given item
//
// return the estimate of the frequency of the given item
func (s *LongsSketch) GetEstimate(item int64) (int64, error) {
itemCount, err := s.hashMap.get(item)
if err != nil {
return 0, err
}
return itemCount + s.offset, nil
}
// GetLowerBound gets the guaranteed lower bound frequency of the given item, which can never be
// negative.
//
// item is the given item.
//
// return the guaranteed lower bound frequency of the given item. That is, a number which
// is guaranteed to be no larger than the real frequency.
func (s *LongsSketch) GetLowerBound(item int64) (int64, error) {
// LB = itemCount
return s.hashMap.get(item)
}
// GetUpperBound gets the guaranteed upper bound frequency of the given item.
//
// item is the given item.
//
// return the guaranteed upper bound frequency of the given item. That is, a number which
// is guaranteed to be no smaller than the real frequency.
func (s *LongsSketch) GetUpperBound(item int64) (int64, error) {
itemCount, err := s.hashMap.get(item)
if err != nil {
return 0, err
}
return itemCount + s.offset, nil
}
// GetFrequentItemsWithThreshold returns an array of Row that include frequent items, estimates, upper and
// lower bounds given a threshold and an ErrorCondition. If the threshold is lower than
// getMaximumError(), then getMaximumError() will be used instead.
//
// The method first examines all active items in the sketch (items that have a counter).
//
// If errorType = NO_FALSE_NEGATIVES, this will include an item in the result list if
// GetUpperBound(item) > threshold. There will be no false negatives, i.e., no Type II error.
// There may be items in the set with true frequencies less than the threshold (false positives).
//
// If errorType = NO_FALSE_POSITIVES, this will include an item in the result list if
// GetLowerBound(item) > threshold. There will be no false positives, i.e., no Type I error.
// There may be items omitted from the set with true frequencies greater than the threshold
// (false negatives). This is a subset of the NO_FALSE_NEGATIVES case.
//
// threshold to include items in the result list
// errorType determines whether no false positives or no false negatives are desired.
// an array of frequent items
func (s *LongsSketch) GetFrequentItemsWithThreshold(threshold int64, errorType errorType) ([]*Row, error) {
finalThreshold := s.GetMaximumError()
if threshold > finalThreshold {
finalThreshold = threshold
}
return s.sortItems(finalThreshold, errorType)
}
// GetFrequentItems returns an array of Row that include frequent items, estimates, upper and
// lower bounds given an ErrorCondition and the default threshold.
// This is the same as GetFrequentItemsWithThreshold(getMaximumError(), errorType)
//
// errorType determines whether no false positives or no false negatives are desired.
func (s *LongsSketch) GetFrequentItems(errorType errorType) ([]*Row, error) {
return s.sortItems(s.GetMaximumError(), errorType)
}
// GetNumActiveItems returns the number of active items in the sketch.
func (s *LongsSketch) GetNumActiveItems() int {
return s.hashMap.numActive
}
// GetMaximumError return an upper bound on the maximum error of GetEstimate(item) for any item.
// This is equivalent to the maximum distance between the upper bound and the lower bound
// for any item.
func (s *LongsSketch) GetMaximumError() int64 {
return s.offset
}
// GetMaximumMapCapacity returns the maximum number of counters the sketch is configured to
// support.
func (s *LongsSketch) GetMaximumMapCapacity() int {
return int(float64(uint64(1<<s.lgMaxMapSize)) * reversePurgeLongHashMapLoadFactor)
}
// GetStorageBytes returns the number of bytes required to store this sketch as slice
func (s *LongsSketch) GetStorageBytes() int {
if s.IsEmpty() {
return 8
}
return (4 * 8) + (16 * s.GetNumActiveItems())
}
// GetStreamLength returns the sum of the frequencies (weights or counts) in the stream seen
// so far by the sketch
func (s *LongsSketch) GetStreamLength() int64 {
return s.streamWeight
}
// IsEmpty returns true if this sketch is empty
func (s *LongsSketch) IsEmpty() bool {
return s.GetNumActiveItems() == 0
}
// Update this sketch with an item and a frequency count of one.
//
// item for which the frequency should be increased.
func (s *LongsSketch) Update(item int64) error {
return s.UpdateMany(item, 1)
}
// UpdateMany this sketch with an item and a positive frequency count (or weight).
//
// Item for which the frequency should be increased. The item can be any long value
// and is only used by the sketch to determine uniqueness.
// count the amount by which the frequency of the item should be increased.
// A count of zero is a no-op, and a negative count will throw an exception.
func (s *LongsSketch) UpdateMany(item int64, count int64) error {
if count == 0 {
return nil
}
if count < 0 {
return errors.New("count may not be negative")
}
s.streamWeight += count
err := s.hashMap.adjustOrPutValue(item, count)
if err != nil {
return err
}
if s.hashMap.numActive > s.curMapCap {
// Over the threshold, we need to do something
if s.hashMap.lgLength < s.lgMaxMapSize {
// Below tgt size, we can grow
err = s.hashMap.resize(2 * len(s.hashMap.keys))
if err != nil {
return err
}
s.curMapCap = s.hashMap.getCapacity()
} else {
// At tgt size, must purge
s.offset += s.hashMap.purge(s.sampleSize)
if s.GetNumActiveItems() > s.GetMaximumMapCapacity() {
return errors.New("purge did not reduce active items")
}
}
}
return nil
}
// Merge merges the other sketch into this one. The other sketch may be of a different size.
//
// other sketch of this class
//
// return a sketch whose estimates are within the guarantees of the largest error tolerance
// of the two merged sketches.
func (s *LongsSketch) Merge(other *LongsSketch) (*LongsSketch, error) {
if other == nil || other.IsEmpty() {
return s, nil
}
streamWt := s.streamWeight + other.streamWeight //capture before Merge
iter := other.hashMap.iterator()
for iter.next() {
err := s.UpdateMany(iter.getKey(), iter.getValue())
if err != nil {
return nil, err
}
}
s.offset += other.offset
s.streamWeight = streamWt //corrected streamWeight
return s, nil
}
// ToString returns a String representation of this sketch
func (s *LongsSketch) ToString() (string, error) {
var sb strings.Builder
//start the string with parameters of the sketch
serVer := _SER_VER //0
famID := internal.FamilyEnum.Frequency.Id
lgMaxMapSz := s.lgMaxMapSize
flags := 0
if s.hashMap.numActive == 0 {
flags = _EMPTY_FLAG_MASK
}
_, err := fmt.Fprintf(&sb, "%d,%d,%d,%d,%d,%d,", serVer, famID, lgMaxMapSz, flags, s.streamWeight, s.offset)
if err != nil {
return "", err
}
sb.WriteString(s.hashMap.serializeToString()) //numActive, curMaplen, key[i], value[i], ...
return sb.String(), nil
}
// ToSlice returns a slice representation of this sketch
func (s *LongsSketch) ToSlice() []byte {
empty := s.IsEmpty()
activeItems := s.GetNumActiveItems()
preLongs := 1
outBytes := 8
if !empty {
preLongs = internal.FamilyEnum.Frequency.MaxPreLongs //4
outBytes = (preLongs + (2 * activeItems)) << 3 //2 because both keys and values are longs
}
outArr := make([]byte, outBytes)
//build first preLong empty or not
pre0 := int64(0)
pre0 = insertPreLongs(int64(preLongs), pre0) //Byte 0
pre0 = insertSerVer(_SER_VER, pre0) //Byte 1
pre0 = insertFamilyID(int64(internal.FamilyEnum.Frequency.Id), pre0) //Byte 2
pre0 = insertLgMaxMapSize(int64(s.lgMaxMapSize), pre0) //Byte 3
pre0 = insertLgCurMapSize(int64(s.hashMap.lgLength), pre0) //Byte 4
if empty {
pre0 = insertFlags(_EMPTY_FLAG_MASK, pre0) //Byte 5
binary.LittleEndian.PutUint64(outArr, uint64(pre0))
return outArr
}
pre := int64(0)
pre0 = insertFlags(0, pre0) //Byte 5
preArr := make([]int64, preLongs)
preArr[0] = pre0
preArr[1] = insertActiveItems(int64(activeItems), pre)
preArr[2] = s.streamWeight
preArr[3] = s.offset
for i := 0; i < preLongs; i++ {
binary.LittleEndian.PutUint64(outArr[i<<3:], uint64(preArr[i]))
}
preBytes := preLongs << 3
activeValues := s.hashMap.getActiveValues()
for i := 0; i < activeItems; i++ {
binary.LittleEndian.PutUint64(outArr[preBytes+(i<<3):], uint64(activeValues[i]))
}
activeKeys := s.hashMap.getActiveKeys()
for i := 0; i < activeItems; i++ {
binary.LittleEndian.PutUint64(outArr[preBytes+((activeItems+i)<<3):], uint64(activeKeys[i]))
}
return outArr
}
// Reset resets this sketch to a virgin state.
func (s *LongsSketch) Reset() {
hasMap, _ := newReversePurgeLongHashMap(1 << _LG_MIN_MAP_SIZE)
s.curMapCap = hasMap.getCapacity()
s.offset = 0
s.streamWeight = 0
s.hashMap = hasMap
}
func (s *LongsSketch) String() string {
var sb strings.Builder
sb.WriteString("FrequentLongsSketch:")
sb.WriteString("\n")
sb.WriteString(" Stream Length : " + strconv.FormatInt(s.streamWeight, 10))
sb.WriteString("\n")
sb.WriteString(" Max Error Offset : " + strconv.FormatInt(s.offset, 10))
sb.WriteString("\n")
sb.WriteString(s.hashMap.String())
return sb.String()
}
func (s *LongsSketch) sortItems(threshold int64, errorType errorType) ([]*Row, error) {
rowList := make([]*Row, 0)
iter := s.hashMap.iterator()
if errorType == ErrorTypeEnum.NoFalseNegatives {
for iter.next() {
est, err := s.GetEstimate(iter.getKey())
if err != nil {
return nil, err
}
ub, err := s.GetUpperBound(iter.getKey())
if err != nil {
return nil, err
}
lb, err := s.GetLowerBound(iter.getKey())
if err != nil {
return nil, err
}
if ub >= threshold {
row := newRow(iter.getKey(), est, ub, lb)
rowList = append(rowList, row)
}
}
} else { //NO_FALSE_POSITIVES
for iter.next() {
est, err := s.GetEstimate(iter.getKey())
if err != nil {
return nil, err
}
ub, err := s.GetUpperBound(iter.getKey())
if err != nil {
return nil, err
}
lb, err := s.GetLowerBound(iter.getKey())
if err != nil {
return nil, err
}
if lb >= threshold {
row := newRow(iter.getKey(), est, ub, lb)
rowList = append(rowList, row)
}
}
}
sort.Slice(rowList, func(i, j int) bool {
return rowList[i].est > rowList[j].est
})
return rowList, nil
}