hll/hll_sketch.go (233 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package hll is dedicated to streaming algorithms that enable estimation of the
// cardinality of a stream of items.
//
// HllSketch and Union are the public facing classes of this high performance implementation of Phillipe Flajolet's
// HyperLogLog algorithm but with significantly improved error behavior and important features that can be
// essential for large production systems that must handle massive data.
package hll
import (
"encoding/binary"
"fmt"
"math/bits"
"unsafe"
"github.com/apache/datasketches-go/internal"
"github.com/twmb/murmur3"
)
type HllSketch interface {
// Copy returns a clone of this sketch.
Copy() (HllSketch, error)
// CopyAs returns a clone of this sketch with the specified TgtHllType.
//
// - tgtHllType, the TgtHllType enum
CopyAs(tgtHllType TgtHllType) (HllSketch, error)
// GetCompositeEstimate is less accurate than GetEstimate method and is automatically used
// when the sketch has gone through union operations where the more accurate HIP estimator
// cannot be used
// This is made public only for error characterization software that exists in separate package and is not
// intended for normal use.
GetCompositeEstimate() (float64, error)
// GetEstimate returns the cardinality estimate
GetEstimate() (float64, error)
// UpdateUInt64 present the given unsigned 64-bit integer as a potential unique item.
UpdateUInt64(datum uint64) error
// UpdateInt64 present the given signed 64-bit integer as a potential unique item.
UpdateInt64(datum int64) error
// UpdateSlice present the given byte slice as a potential unique item.
UpdateSlice(datum []byte) error
// UpdateString present the given string as a potential unique item.
UpdateString(datum string) error
// Reset resets the sketch to empty, but does not change the configured values of lgConfigK and tgtHllType.
Reset() error
// GetLowerBound gets the approximate lower error bound given the specifified numbers of standard deviations.
//
// - numStdDev, this must be an integer between 1 and 3, inclusive.
GetLowerBound(numStdDev int) (float64, error)
// GetUpperBound gets the approximate upper error bound given the specified number of standard deviations.
//
// - numStdDev, this must be an integer between 1 and 3, inclusive.
GetUpperBound(numStdDev int) (float64, error)
// IsEmpty returns true if the sketch is empty.
IsEmpty() bool
// GetLgConfigK returns the lgConfigK of the sketch.
GetLgConfigK() int
// GetTgtHllType returns the TgtHllType of the sketch.
GetTgtHllType() TgtHllType
// GetCurMode returns the current mode of the sketch: LIST, SET, HLL.
GetCurMode() curMode
// GetUpdatableSerializationBytes gets the size in bytes of the current sketch when serialized using
// ToUpdatableSlice.
GetUpdatableSerializationBytes() int
// ToCompactSlice serializes the sketch to a slice, compacting data structures
// where feasible to eliminate unused storage in the serialized image.
ToCompactSlice() ([]byte, error)
// ToUpdatableSlice serializes the sketch as a byte slice in an updatable form.
// The updatable form is larger than the compact form.
ToUpdatableSlice() ([]byte, error)
GetSerializationVersion() int
couponUpdate(coupon int) (hllSketchStateI, error)
iterator() pairIterator
}
type hllSketchStateI interface {
GetCompositeEstimate() (float64, error)
GetEstimate() (float64, error)
GetHipEstimate() (float64, error)
GetLowerBound(numStdDev int) (float64, error)
GetUpperBound(numStdDev int) (float64, error)
IsEmpty() bool
GetLgConfigK() int
GetTgtHllType() TgtHllType
GetCurMode() curMode
GetUpdatableSerializationBytes() int
ToCompactSlice() ([]byte, error)
ToUpdatableSlice() ([]byte, error)
getMemDataStart() int
getPreInts() int
isOutOfOrder() bool
isRebuildCurMinNumKxQFlag() bool
putOutOfOrder(oooFlag bool)
putRebuildCurMinNumKxQFlag(rebuildCurMinNumKxQFlag bool)
copyAs(tgtHllType TgtHllType) (hllSketchStateI, error)
copy() (hllSketchStateI, error)
mergeTo(dest HllSketch) error
couponUpdate(coupon int) (hllSketchStateI, error)
iterator() pairIterator
}
type hllSketchState struct { // extends BaseHllSketch
sketch hllSketchStateI
scratch [8]byte
}
func newHllSketchState(coupon hllSketchStateI) HllSketch {
return &hllSketchState{
sketch: coupon,
scratch: [8]byte{},
}
}
// NewHllSketch constructs a new sketch with the type of HLL sketch to configure
//
// - lgConfigK, the Log2 of K for the target HLL sketch. This value must be
//
// between 4 and 21 inclusively.
//
// - tgtHllType. the desired HLL type.
func NewHllSketch(lgConfigK int, tgtHllType TgtHllType) (HllSketch, error) {
lgK := lgConfigK
lgK, err := checkLgK(lgK)
if err != nil {
return nil, err
}
couponList, err := newCouponList(lgK, tgtHllType, curModeList)
if err != nil {
return nil, err
}
return newHllSketchState(&couponList), nil
}
// NewHllSketchWithDefault constructs a new on-heap sketch with the default lgK and tgtHllType.
func NewHllSketchWithDefault() (HllSketch, error) {
return NewHllSketch(defaultLgK, TgtHllTypeDefault)
}
// NewHllSketchFromSlice deserialize a given byte slice, which must be a valid HllSketch image and may have data.
//
// - bytes, the given byte slice, this slice is not modified and is not retained by the sketch
func NewHllSketchFromSlice(bytes []byte, checkRebuild bool) (HllSketch, error) {
if len(bytes) < 8 {
return nil, fmt.Errorf("input array too small: %d", len(bytes))
}
curMode, err := checkPreamble(bytes)
if err != nil {
return nil, err
}
if curMode == curModeHll {
tgtHllType := extractTgtHllType(bytes)
if tgtHllType == TgtHllTypeHll4 {
sk, err := deserializeHll4(bytes)
if err != nil {
return nil, err
}
return newHllSketchState(sk), nil
} else if tgtHllType == TgtHllTypeHll6 {
return newHllSketchState(deserializeHll6(bytes)), nil
} else {
a := newHllSketchState(deserializeHll8(bytes))
if checkRebuild {
err := checkRebuildCurMinNumKxQ(a)
if err != nil {
return nil, err
}
}
return a, nil
}
} else if curMode == curModeList {
cp, err := deserializeCouponList(bytes)
if err != nil {
return nil, err
}
return newHllSketchState(cp), nil
} else {
chs, err := deserializeCouponHashSet(bytes)
if err != nil {
return nil, err
}
return newHllSketchState(chs), nil
}
}
func (h *hllSketchState) Copy() (HllSketch, error) {
sketch, err := h.sketch.copy()
if err != nil {
return nil, err
}
return newHllSketchState(sketch), nil
}
func (h *hllSketchState) CopyAs(tgtHllType TgtHllType) (HllSketch, error) {
sketch, err := h.sketch.copyAs(tgtHllType)
if err != nil {
return nil, err
}
return newHllSketchState(sketch), nil
}
func (h *hllSketchState) GetCompositeEstimate() (float64, error) {
return h.sketch.GetCompositeEstimate()
}
func (h *hllSketchState) GetEstimate() (float64, error) {
return h.sketch.GetEstimate()
}
func (h *hllSketchState) GetHipEstimate() (float64, error) {
return h.sketch.GetHipEstimate()
}
func (h *hllSketchState) GetUpperBound(numStdDev int) (float64, error) {
return h.sketch.GetUpperBound(numStdDev)
}
func (h *hllSketchState) GetLowerBound(numStdDev int) (float64, error) {
return h.sketch.GetLowerBound(numStdDev)
}
func (h *hllSketchState) GetUpdatableSerializationBytes() int {
return h.sketch.GetUpdatableSerializationBytes()
}
func (h *hllSketchState) UpdateUInt64(datum uint64) error {
binary.LittleEndian.PutUint64(h.scratch[:], datum)
_, err := h.couponUpdate(coupon(h.hash(h.scratch[:])))
return err
}
func (h *hllSketchState) UpdateInt64(datum int64) error {
return h.UpdateUInt64(uint64(datum))
}
func (h *hllSketchState) UpdateSlice(datum []byte) error {
if len(datum) == 0 {
return nil
}
_, err := h.couponUpdate(coupon(h.hash(datum)))
return err
}
func (h *hllSketchState) UpdateString(datum string) error {
// get a slice to the string data (avoiding a copy to heap)
return h.UpdateSlice(unsafe.Slice(unsafe.StringData(datum), len(datum)))
}
func (h *hllSketchState) IsEmpty() bool {
return h.sketch.IsEmpty()
}
func (h *hllSketchState) ToCompactSlice() ([]byte, error) {
return h.sketch.ToCompactSlice()
}
func (h *hllSketchState) ToUpdatableSlice() ([]byte, error) {
return h.sketch.ToUpdatableSlice()
}
func (h *hllSketchState) GetLgConfigK() int {
return h.sketch.GetLgConfigK()
}
func (h *hllSketchState) GetTgtHllType() TgtHllType {
return h.sketch.GetTgtHllType()
}
func (h *hllSketchState) GetCurMode() curMode {
return h.sketch.GetCurMode()
}
func (h *hllSketchState) Reset() error {
lgK, err := checkLgK(h.sketch.GetLgConfigK())
if err != nil {
return err
}
couponList, err := newCouponList(lgK, h.sketch.GetTgtHllType(), curModeList)
if err != nil {
return err
}
h.sketch = &couponList
return nil
}
func (h *hllSketchState) iterator() pairIterator {
return h.sketch.iterator()
}
func coupon(hashLo uint64, hashHi uint64) int {
addr26 := hashLo & keyMask26
lz := uint64(bits.LeadingZeros64(hashHi))
value := min(lz, 62) + 1
return int((value << keyBits26) | addr26)
}
func (h *hllSketchState) couponUpdate(coupon int) (hllSketchStateI, error) {
if (coupon >> keyBits26) == empty {
return h.sketch, nil
}
sk, err := h.sketch.couponUpdate(coupon)
h.sketch = sk
return h.sketch, err
}
func (h *hllSketchState) putRebuildCurMinNumKxQFlag(rebuildCurMinNumKxQFlag bool) {
h.sketch.putRebuildCurMinNumKxQFlag(rebuildCurMinNumKxQFlag)
}
func (h *hllSketchState) mergeTo(dest HllSketch) error {
return h.sketch.mergeTo(dest)
}
// GetSerializationVersion returns the serialization version used by this sketch.
func (h *hllSketchState) GetSerializationVersion() int {
return serVer
}
func (h *hllSketchState) hash(bs []byte) (uint64, uint64) {
return murmur3.SeedSum128(internal.DEFAULT_UPDATE_SEED, internal.DEFAULT_UPDATE_SEED, bs)
}