cpc/cpc_sketch.go (523 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cpc
import (
"encoding/binary"
"fmt"
"github.com/apache/datasketches-go/internal"
"github.com/twmb/murmur3"
"math"
"math/bits"
"unsafe"
)
const (
minLgK = 4
maxLgK = 26
empiricalSizeMaxLgK = 19
empiricalMaxSizeFactor = 0.6 // equals 0.6 = 4.8 / 8.0
maxPreambleSizeBytes = 40
)
var empiricalMaxBytes = []int{
24, // lgK = 4
36, // lgK = 5
56, // lgK = 6
100, // lgK = 7
180, // lgK = 8
344, // lgK = 9
660, // lgK = 10
1292, // lgK = 11
2540, // lgK = 12
5020, // lgK = 13
9968, // lgK = 14
19836, // lgK = 15
39532, // lgK = 16
78880, // lgK = 17
157516, // lgK = 18
314656, // lgK = 19
}
type CpcSketch struct {
seed uint64
//common variables
lgK int
numCoupons uint64 // The number of coupons collected so far.
mergeFlag bool // Is the sketch the result of merging?
fiCol int // First Interesting Column. This is part of a speed optimization.
windowOffset int
slidingWindow []byte //either null or size K bytes
pairTable *pairTable //for sparse and surprising values, either null or variable size
//The following variables are only valid in HIP variants
kxp float64 //used with HIP
hipEstAccum float64 //used with HIP
scratch [8]byte
}
func NewCpcSketchWithDefault(lgK int) (*CpcSketch, error) {
return NewCpcSketch(lgK, internal.DEFAULT_UPDATE_SEED)
}
func NewCpcSketch(lgK int, seed uint64) (*CpcSketch, error) {
if err := checkLgK(lgK); err != nil {
return nil, err
}
return &CpcSketch{
lgK: lgK,
seed: seed,
kxp: float64(int64(1) << lgK),
}, nil
}
func NewCpcSketchFromSlice(bytes []byte, seed uint64) (*CpcSketch, error) {
c, err := importFromMemory(bytes)
if err != nil {
return nil, err
}
sketch, err := NewCpcSketch(c.LgK, seed)
if err != nil {
return nil, err
}
sketch.numCoupons = c.NumCoupons
sketch.windowOffset = c.getWindowOffset()
sketch.fiCol = c.FiCol
sketch.mergeFlag = c.MergeFlag
sketch.kxp = c.Kxp
sketch.hipEstAccum = c.HipEstAccum
sketch.slidingWindow = nil
sketch.pairTable = nil
err = c.uncompress(sketch)
return sketch, err
}
func NewCpcSketchFromSliceWithDefault(bytes []byte) (*CpcSketch, error) {
return NewCpcSketchFromSlice(bytes, internal.DEFAULT_UPDATE_SEED)
}
func (c *CpcSketch) GetEstimate() float64 {
if c.mergeFlag {
return iconEstimate(c.lgK, c.numCoupons)
}
return c.hipEstAccum
}
func (c *CpcSketch) GetLowerBound(kappa int) float64 {
if c.mergeFlag {
return iconConfidenceLB(c.lgK, c.numCoupons, kappa)
}
return hipConfidenceLB(c.lgK, c.numCoupons, c.hipEstAccum, kappa)
}
func (c *CpcSketch) GetUpperBound(kappa int) float64 {
if c.mergeFlag {
return iconConfidenceUB(c.lgK, c.numCoupons, kappa)
}
return hipConfidenceUB(c.lgK, c.numCoupons, c.hipEstAccum, kappa)
}
func (c *CpcSketch) UpdateUint64(datum uint64) error {
binary.LittleEndian.PutUint64(c.scratch[:], datum)
hashLo, hashHi := hash(c.scratch[:], c.seed)
return c.hashUpdate(hashLo, hashHi)
}
func (c *CpcSketch) UpdateInt64(datum int64) error {
return c.UpdateUint64(uint64(datum))
}
func (c *CpcSketch) UpdateFloat64(datum float64) error {
// Merge +0.0 and -0.0
if datum == 0.0 {
datum = 0.0 // ensures +0.0 == -0.0
}
datumBits := math.Float64bits(datum)
// Canonicalize all NaN forms to 0x7ff8000000000000
if (datumBits&0x7ff0000000000000) == 0x7ff0000000000000 && // exponent all 1's
(datumBits&0x000fffffffffffff) != 0 { // mantissa non-zero => NaN
datumBits = 0x7ff8000000000000
}
// Hash the canonicalized bits
binary.LittleEndian.PutUint64(c.scratch[:], datumBits)
hashLo, hashHi := hash(c.scratch[:], c.seed)
return c.hashUpdate(hashLo, hashHi)
}
func (c *CpcSketch) UpdateInt64Slice(datum []int64) error {
if len(datum) == 0 {
return nil
}
hashLo, hashHi := internal.HashInt64SliceMurmur3(datum, 0, len(datum), c.seed)
return c.hashUpdate(hashLo, hashHi)
}
func (c *CpcSketch) UpdateInt32Slice(datum []int32) error {
if len(datum) == 0 {
return nil
}
hashLo, hashHi := internal.HashInt32SliceMurmur3(datum, 0, len(datum), c.seed)
return c.hashUpdate(hashLo, hashHi)
}
func (c *CpcSketch) UpdateByteSlice(datum []byte) error {
if len(datum) == 0 {
return nil
}
hashLo, hashHi := murmur3.SeedSum128(c.seed, c.seed, datum)
return c.hashUpdate(hashLo, hashHi)
}
func (c *CpcSketch) UpdateString(datum string) error {
if len(datum) == 0 {
return nil
}
// get a slice to the string data (avoiding a copy to heap)
return c.UpdateByteSlice(unsafe.Slice(unsafe.StringData(datum), len(datum)))
}
func (c *CpcSketch) hashUpdate(hash0, hash1 uint64) error {
col := bits.LeadingZeros64(hash1)
if col < c.fiCol {
return nil
}
if col > 63 {
col = 63
}
if c.numCoupons == 0 {
if err := c.promoteEmptyToSparse(); err != nil {
return err
}
}
k := uint64(1) << c.lgK
row := int(hash0 & (k - 1))
// Use a 64-bit intermediate to avoid sign overflow.
rowCol64 := (int64(row) << 6) | int64(col)
// Only apply the hack if lgK >= 26 and rowCol64 == 0xFFFFFFFF (which is -1 in signed 64-bit).
if c.lgK >= 26 && rowCol64 == -1 {
rowCol64 ^= 1 << 6 // flip the LSB of row
}
rowCol := int(rowCol64)
// Then proceed normally:
if (c.numCoupons << 5) < (3 * k) {
return c.updateSparse(rowCol)
}
return c.updateWindowed(rowCol)
}
func (c *CpcSketch) promoteEmptyToSparse() error {
pairTable, err := NewPairTable(2, 6+c.lgK)
if err != nil {
return err
}
c.pairTable = pairTable
return nil
}
func (c *CpcSketch) updateSparse(rowCol int) error {
k := uint64(1) << c.lgK
c32pre := c.numCoupons << 5
if c32pre >= (3 * k) {
// C >= 3K/32, in other words, flavor == SPARSE
return fmt.Errorf("C >= 3K/32")
}
if c.pairTable == nil {
return fmt.Errorf("pairTable is nil")
}
isNovel, err := c.pairTable.maybeInsert(rowCol)
if err != nil {
return err
}
if isNovel {
c.numCoupons++
c.updateHIP(rowCol)
c32post := c.numCoupons << 5
if c32post >= (3 * k) {
err = c.promoteSparseToWindowed() // C >= 3K/32
if err != nil {
return err
}
}
}
return nil
}
func (c *CpcSketch) updateWindowed(rowCol int) error {
if c.windowOffset < 0 || c.windowOffset > 56 {
return fmt.Errorf("windowOffset < 0 || windowOffset > 56")
}
k := uint64(1) << c.lgK
c32pre := c.numCoupons << 5
if c32pre < (3 * k) {
return fmt.Errorf("C < 3K/32")
}
c8pre := c.numCoupons << 3
w8pre := uint64(c.windowOffset << 3)
if c8pre >= ((uint64(27) + w8pre) * k) {
return fmt.Errorf("C >= (K * 27/8) + (K * windowOffset)")
}
isNovel := false //novel if new coupon
err := error(nil)
col := rowCol & 63
if col < c.windowOffset { // track the surprising 0's "before" the window
isNovel, err = c.pairTable.maybeDelete(rowCol)
if err != nil {
return err
}
} else if col < (c.windowOffset + 8) { // track the 8 bits inside the window
row := rowCol >> 6
oldBits := c.slidingWindow[row]
newBits := oldBits | (1 << (col - c.windowOffset))
if newBits != oldBits {
c.slidingWindow[row] = newBits
isNovel = true
}
} else { // track the surprising 1's "after" the window
isNovel, err = c.pairTable.maybeInsert(rowCol)
if err != nil {
return err
}
}
if isNovel {
c.numCoupons++
c.updateHIP(rowCol)
c8post := c.numCoupons << 3
if c8post >= ((27 + w8pre) * k) {
if err := c.modifyOffset(c.windowOffset + 1); err != nil {
return err
}
if c.windowOffset < 1 || c.windowOffset > 56 {
return fmt.Errorf("windowOffset < 1 || windowOffset > 56")
}
w8post := uint64(c.windowOffset << 3)
if c8post >= ((uint64(27) + w8post) * k) {
return fmt.Errorf("C < (K * 27/8) + (K * windowOffset)")
}
}
}
return nil
}
func hash(bs []byte, seed uint64) (uint64, uint64) {
return murmur3.SeedSum128(seed, seed, bs)
}
func (c *CpcSketch) getFormat() CpcFormat {
ordinal := 0
f := c.getFlavor()
if f == CpcFlavorHybrid || f == CpcFlavorSparse {
ordinal = 2
if !c.mergeFlag {
ordinal |= 1
}
} else {
ordinal = 0
if c.slidingWindow != nil {
ordinal |= 4
}
if c.pairTable != nil && c.pairTable.numPairs > 0 {
ordinal |= 2
}
if !c.mergeFlag {
ordinal |= 1
}
}
return CpcFormat(ordinal)
}
func (c *CpcSketch) getFlavor() CpcFlavor {
return determineFlavor(c.lgK, c.numCoupons)
}
func (c *CpcSketch) updateHIP(rowCol int) {
k := 1 << c.lgK
col := rowCol & 63
oneOverP := float64(k) / c.kxp
c.hipEstAccum += oneOverP
kxp, _ := internal.InvPow2(col + 1)
c.kxp -= kxp
}
func (c *CpcSketch) promoteSparseToWindowed() error {
window := make([]byte, 1<<c.lgK)
newTable, _ := NewPairTable(2, 6+c.lgK)
oldTable := c.pairTable
oldSlots := oldTable.slotsArr
oldNumSlots := 1 << oldTable.lgSizeInts
for i := 0; i < oldNumSlots; i++ {
rowCol := oldSlots[i]
if rowCol != -1 {
col := rowCol & 63
if col < 8 {
row := rowCol >> 6
window[row] |= 1 << col
} else {
isNovel, err := newTable.maybeInsert(rowCol)
if err != nil {
return fmt.Errorf("maybeInsert: %v", err)
}
if !isNovel {
return fmt.Errorf("promoteSparseToWindowed: unexpected collision")
}
}
}
}
c.slidingWindow = window
c.pairTable = newTable
return nil
}
func (c *CpcSketch) reset() {
c.numCoupons = 0
c.mergeFlag = false
c.fiCol = 0
c.windowOffset = 0
c.slidingWindow = nil
c.pairTable = nil
c.kxp = float64(int64(1) << c.lgK)
c.hipEstAccum = 0
}
func (c *CpcSketch) rowColUpdate(rowCol int) error {
col := rowCol & 63
if col < c.fiCol {
return nil
}
if c.numCoupons == 0 {
err := c.promoteEmptyToSparse()
if err != nil {
return err
}
}
k := uint64(1) << c.lgK
if (c.numCoupons << 5) < (3 * k) {
return c.updateSparse(rowCol)
}
return c.updateWindowed(rowCol)
}
func (c *CpcSketch) modifyOffset(newOffset int) error {
if newOffset < 0 || newOffset > 56 {
return fmt.Errorf("newOffset < 0 || newOffset > 56")
}
if newOffset != (c.windowOffset + 1) {
return fmt.Errorf("newOffset != (c.windowOffset + 1)")
}
if c.slidingWindow == nil || c.pairTable == nil {
return fmt.Errorf("slidingWindow == nil || pairTable == nil")
}
k := 1 << c.lgK
bitMatrix, err := c.bitMatrixOfSketch()
if err != nil {
return err
}
if (newOffset & 0x7) == 0 {
c.refreshKXP(bitMatrix)
}
c.pairTable.clear()
maskForClearingWindow := (0xFF << newOffset) ^ -1
maskForFlippingEarlyZone := (1 << newOffset) - 1
allSurprisesORed := uint64(0)
for i := 0; i < k; i++ {
pattern := bitMatrix[i]
c.slidingWindow[i] = byte((pattern >> newOffset) & 0xFF)
pattern &= uint64(maskForClearingWindow)
pattern ^= uint64(maskForFlippingEarlyZone)
allSurprisesORed |= pattern
for pattern != 0 {
col := bits.TrailingZeros64(pattern)
pattern ^= 1 << col
rowCol := (i << 6) | col
isNovel, err := c.pairTable.maybeInsert(rowCol)
if err != nil {
return err
}
if !isNovel {
return nil
}
}
}
c.windowOffset = newOffset
c.fiCol = bits.TrailingZeros64(allSurprisesORed)
if c.fiCol > newOffset {
c.fiCol = newOffset
}
return nil
}
func (c *CpcSketch) refreshKXP(bitMatrix []uint64) {
k := 1 << c.lgK
byteSums := make([]float64, 8)
for i := 0; i < k; i++ {
row := bitMatrix[i]
for j := 0; j < 8; j++ {
byteIdx := int(row & 0xFF)
byteSums[j] += kxpByteLookup[byteIdx]
row >>= 8
}
}
total := 0.0
for j := 6; j >= 0; j-- {
factor, _ := internal.InvPow2(8 * j)
total += factor * byteSums[j]
}
c.kxp = total
}
func (c *CpcSketch) bitMatrixOfSketch() ([]uint64, error) {
k := uint64(1) << c.lgK
offset := c.windowOffset
if offset < 0 || offset > 56 {
return nil, fmt.Errorf("offset < 0 || offset > 56")
}
matrix := make([]uint64, k)
if c.numCoupons == 0 {
return matrix, nil // Returning a matrix of zeros rather than NULL.
}
//Fill the matrix with default rows in which the "early zone" is filled with ones.
//This is essential for the routine's O(k) time cost (as opposed to O(C)).
defaultRow := (1 << offset) - 1
for i := range matrix {
matrix[i] = uint64(defaultRow)
}
if c.slidingWindow != nil { // In other words, we are in window mode, not sparse mode.
for i, v := range c.slidingWindow { // set the window bits, trusting the sketch's current offset.
matrix[i] |= uint64(v) << offset
}
}
table := c.pairTable
if table == nil {
return nil, fmt.Errorf("table == nil")
}
slots := table.slotsArr
numSlots := 1 << table.lgSizeInts
for i := 0; i < numSlots; i++ {
rowCol := slots[i]
if rowCol != -1 {
col := rowCol & 63
row := rowCol >> 6
// Flip the specified matrix bit from its default value.
// In the "early" zone the bit changes from 1 to 0.
// In the "late" zone the bit changes from 0 to 1.
matrix[row] ^= 1 << col
}
}
return matrix, nil
}
// ToCompactSlice returns this sketch as a compressed byte slice.
func (c *CpcSketch) ToCompactSlice() ([]byte, error) {
// Create the compressed state from the sketch.
compressedState, err := NewCpcCompressedStateFromSketch(c)
if err != nil {
return nil, err
}
return compressedState.exportToMemory()
}
func (c *CpcSketch) getFamily() int {
return internal.FamilyEnum.CPC.Id
}
// GetLgK returns the log-base-2 of K.
func (c *CpcSketch) GetLgK() int {
return c.lgK
}
// isEmpty returns true if no coupons have been collected.
func (c *CpcSketch) isEmpty() bool {
return c.numCoupons == 0
}
// validate recomputes the coupon count from the bit matrix and returns true if it matches the sketch's numCoupons.
func (c *CpcSketch) validate() (bool, error) {
bitMatrix, err := c.bitMatrixOfSketch()
if err != nil {
return false, err
}
matrixCoupons := countBitsSetInMatrix(bitMatrix)
return matrixCoupons == c.numCoupons, nil
}
// Copy creates and returns a deep copy of the CpcSketch.
func (c *CpcSketch) Copy() (*CpcSketch, error) {
// Create a new sketch with the same lgK and seed.
copySketch, err := NewCpcSketch(c.lgK, c.seed)
if err != nil {
// This should never happen if the current sketch is valid.
return nil, err
}
// copy basic fields.
copySketch.numCoupons = c.numCoupons
copySketch.mergeFlag = c.mergeFlag
copySketch.fiCol = c.fiCol
copySketch.windowOffset = c.windowOffset
// Clone the slidingWindow slice if present.
if c.slidingWindow != nil {
copySketch.slidingWindow = make([]byte, len(c.slidingWindow))
copy(copySketch.slidingWindow, c.slidingWindow)
} else {
copySketch.slidingWindow = nil
}
// Copy the pair table if present.
if c.pairTable != nil {
copySketch.pairTable, err = c.pairTable.copy()
if err != nil {
copySketch.pairTable = nil
}
} else {
copySketch.pairTable = nil
}
// copy floating-point accumulators.
copySketch.kxp = c.kxp
copySketch.hipEstAccum = c.hipEstAccum
/*
Added the copy of the scratch buffer to ensure that every field, even temporary ones, in the struct is duplicated,
so that the copy is entirely independent of the original. Since the scratch buffer is part of the struct, we copy it too.
*/
copy(copySketch.scratch[:], c.scratch[:])
return copySketch, nil
}
func (c *CpcSketch) String() string {
mem, _ := c.ToCompactSlice()
str, _ := CpcSketchToString(mem, false)
return str
}
// getMaxSerializedBytes returns the estimated maximum serialized size of a sketch
// given lgK.
func getMaxSerializedBytes(lgK int) (int, error) {
// Verify that lgK is within valid bounds.
if err := checkLgK(lgK); err != nil {
return 0, err
}
// Use the empirical array if lgK is <= empiricalSizeMaxLgK.
if lgK <= empiricalSizeMaxLgK {
return empiricalMaxBytes[lgK-minLgK] + maxPreambleSizeBytes, nil
}
// Otherwise, compute based on k = 1 << lgK.
k := 1 << lgK
return int(empiricalMaxSizeFactor*float64(k)) + maxPreambleSizeBytes, nil
}