cpc/cpc_compressed_state.go (936 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cpc
import (
"fmt"
"github.com/apache/datasketches-go/internal"
"math/bits"
)
// Constants for ptrArr indices.
const (
NextWordIdx = 0 // ptrArr[0]: nextWordIndex
BitBuf = 1 // ptrArr[1]: bitBuf
BufBits = 2 // ptrArr[2]: bufBits
)
type CpcCompressedState struct {
CsvIsValid bool
WindowIsValid bool
LgK int
SeedHash int16
FiCol int
MergeFlag bool // compliment of HIP Flag
NumCoupons uint64
Kxp float64
HipEstAccum float64
NumCsv uint64
CsvStream []int // may be longer than required
CsvLengthInts int
CwStream []int // may be longer than required
CwLengthInts int
}
func NewCpcCompressedState(lgK int, seedHash int16) *CpcCompressedState {
return &CpcCompressedState{
LgK: lgK,
SeedHash: seedHash,
Kxp: float64(int(1) << lgK),
}
}
func NewCpcCompressedStateFromSketch(sketch *CpcSketch) (*CpcCompressedState, error) {
seedHash, err := internal.ComputeSeedHash(int64(sketch.seed))
if err != nil {
return nil, err
}
state := NewCpcCompressedState(sketch.lgK, seedHash)
state.FiCol = sketch.fiCol
state.MergeFlag = sketch.mergeFlag
state.NumCoupons = sketch.numCoupons
state.Kxp = sketch.kxp
state.HipEstAccum = sketch.hipEstAccum
state.CsvIsValid = sketch.pairTable != nil
state.WindowIsValid = sketch.slidingWindow != nil
err = state.compress(sketch)
return state, err
}
func (c *CpcCompressedState) getRequiredSerializedBytes() int {
preInts := getDefinedPreInts(c.getFormat())
return 4 * (preInts + c.CsvLengthInts + c.CwLengthInts)
}
func (c *CpcCompressedState) getWindowOffset() int {
return determineCorrectOffset(c.LgK, c.NumCoupons)
}
func (c *CpcCompressedState) getFormat() CpcFormat {
ordinal := 0
if c.CwLengthInts > 0 {
ordinal |= 4
}
if c.NumCsv > 0 {
ordinal |= 2
}
if !c.MergeFlag {
ordinal |= 1
}
return CpcFormat(ordinal)
}
func (c *CpcCompressedState) compress(src *CpcSketch) error {
srcFlavor := src.getFlavor()
var err error
switch srcFlavor {
case CpcFlavorEmpty:
return nil
case CpcFlavorSparse:
err = c.compressSparseFlavor(src)
if c.CwStream != nil {
return fmt.Errorf("compress: sparse flavor %v CwStream not null %v", srcFlavor, c.CwStream)
}
if c.CsvStream == nil {
return fmt.Errorf("compress: sparse flavor %v CsvStream is null", srcFlavor)
}
case CpcFlavorHybrid:
err = c.compressHybridFlavor(src)
if c.CwStream != nil {
return fmt.Errorf("compress: sparse flavor %v CwStream not null %v", srcFlavor, c.CwStream)
}
if c.CsvStream == nil {
return fmt.Errorf("compress: sparse flavor %v CsvStream is null", srcFlavor)
}
case CpcFlavorPinned:
err = c.compressPinnedFlavor(src)
if c.CwStream == nil {
return fmt.Errorf("compress: sparse flavor %v CwStream is null", srcFlavor)
}
case CpcFlavorSliding:
err = c.compressSlidingFlavor(src)
if c.CwStream == nil {
return fmt.Errorf("compress: sparse flavor %v CwStream is null", srcFlavor)
}
default:
return fmt.Errorf("unable to compress flavor %v", srcFlavor)
}
return err
}
func (c *CpcCompressedState) uncompress(src *CpcSketch) error {
srcFlavor := src.getFlavor()
var err error
switch srcFlavor {
case CpcFlavorEmpty:
return nil
case CpcFlavorSparse:
if c.CwStream != nil {
return fmt.Errorf("uncompress: sparse flavor %v CwStream not null %v", srcFlavor, c.CwStream)
}
err = c.uncompressSparseFlavor(src)
case CpcFlavorHybrid:
err = c.uncompressHybridFlavor(src)
case CpcFlavorPinned:
if c.CwStream == nil {
return fmt.Errorf("uncompress: pinned flavor %v CwStream is null", srcFlavor)
}
err = c.uncompressPinnedFlavor(src)
case CpcFlavorSliding:
err = c.uncompressSlidingFlavor(src)
default:
return fmt.Errorf("unable to uncompress flavor %v", srcFlavor)
}
return err
}
// uncompressSketch creates a new CpcSketch from the compressed state and the given seed,
// after verifying that the computed seed hash matches the source’s seed hash.
func uncompressSketch(source *CpcCompressedState, seed uint64) (*CpcSketch, error) {
// Compute the seed hash from the provided seed.
computedSeedHash, err := internal.ComputeSeedHash(int64(seed))
if err != nil {
return nil, err
}
// Verify that the computed seed hash matches the one stored in the source.
if computedSeedHash != source.SeedHash {
return nil, fmt.Errorf("seed hash mismatch: computed %d != source %d", computedSeedHash, source.SeedHash)
}
// Create a new sketch using the source's lgK and the given seed.
sketch, err := NewCpcSketch(source.LgK, seed)
if err != nil {
return nil, err
}
// Populate the new sketch with fields from the compressed state.
sketch.numCoupons = source.NumCoupons
// Assuming source.getWindowOffset() exists and returns the correct window offset.
sketch.windowOffset = source.getWindowOffset()
sketch.fiCol = source.FiCol
sketch.mergeFlag = source.MergeFlag
sketch.kxp = source.Kxp
sketch.hipEstAccum = source.HipEstAccum
// Reset fields that will be filled during uncompression.
sketch.slidingWindow = nil
sketch.pairTable = nil
// Uncompress the detailed data into the sketch using the existing uncompress method.
if err := source.uncompress(sketch); err != nil {
return nil, err
}
return sketch, nil
}
func (c *CpcCompressedState) compressSparseFlavor(src *CpcSketch) error {
// There is no window to compress
if src.slidingWindow != nil {
return fmt.Errorf("compressSparseFlavor: expected slidingWindow to be nil")
}
// Get the pair table and extract its pairs.
srcPairTable := src.pairTable
srcNumPairs := srcPairTable.numPairs
srcPairArr, err := srcPairTable.unwrap(srcNumPairs)
if err != nil {
return err
}
introspectiveInsertionSort(srcPairArr, 0, srcNumPairs-1)
return compressTheSurprisingValues(c, src, srcPairArr, srcNumPairs)
}
func (c *CpcCompressedState) uncompressSparseFlavor(src *CpcSketch) error {
if c.CwStream != nil {
return fmt.Errorf("uncompressSparseFlavor: expected cwStream to be nil, got %v", c.CwStream)
}
if c.CsvStream == nil {
return fmt.Errorf("uncompressSparseFlavor: csvStream is nil")
}
srcPairArr, err := uncompressTheSurprisingValues(c)
if err != nil {
return err
}
numPairs := int(c.NumCsv)
table, err := newInstanceFromPairsArray(srcPairArr, numPairs, c.LgK)
if err != nil {
return err
}
src.pairTable = table
return nil
}
func (c *CpcCompressedState) compressHybridFlavor(src *CpcSketch) error {
srcK := 1 << src.lgK
srcPairTable := src.pairTable
srcNumPairs := srcPairTable.numPairs
// Get and sort the pair array.
srcPairArr, err := srcPairTable.unwrap(srcNumPairs)
if err != nil {
return err
}
introspectiveInsertionSort(srcPairArr, 0, srcNumPairs-1)
// Retrieve sliding window and related values.
srcSlidingWindow := src.slidingWindow
srcWindowOffset := src.windowOffset
srcNumCoupons := src.numCoupons
if srcSlidingWindow == nil {
return fmt.Errorf("compressHybridFlavor: slidingWindow is nil")
}
if srcWindowOffset != 0 {
return fmt.Errorf("compressHybridFlavor: windowOffset must be 0, got %d", srcWindowOffset)
}
// Determine the number of pairs present in the window.
numPairs := srcNumCoupons - uint64(srcNumPairs)
// Check that numPairs fits in an int.
if numPairs >= uint64(int(^uint32(0)>>1)) {
return fmt.Errorf("compressHybridFlavor: numPairs (%d) exceeds maximum int value", numPairs)
}
numPairsFromArray := int(numPairs)
// Invariant check: total pairs from array must equal numCoupons.
if numPairsFromArray+srcNumPairs != int(srcNumCoupons) {
return fmt.Errorf("compressHybridFlavor: invariant violation (%d + %d != %d)",
numPairsFromArray, srcNumPairs, srcNumCoupons)
}
allPairs := trickyGetPairsFromWindow(srcSlidingWindow, srcK, numPairsFromArray, srcNumPairs)
mergePairs(srcPairArr, 0, srcNumPairs, allPairs, srcNumPairs, numPairsFromArray, allPairs, 0)
return compressTheSurprisingValues(c, src, allPairs, int(srcNumCoupons))
}
func (c *CpcCompressedState) uncompressHybridFlavor(src *CpcSketch) error {
// Ensure that the window compression stream is nil and the CSV stream is present.
if c.CwStream != nil {
return fmt.Errorf("uncompressHybridFlavor: expected CwStream to be nil, got %v", c.CwStream)
}
if c.CsvStream == nil {
return fmt.Errorf("uncompressHybridFlavor: CsvStream is nil")
}
// Uncompress the surprising values (i.e. the pairs) from the CSV stream.
pairs, err := uncompressTheSurprisingValues(c)
if err != nil {
return err
}
numPairs := int(c.NumCsv)
// For the hybrid flavor, some pairs belong to the sliding window.
srcLgK := c.LgK
k := 1 << srcLgK
// Allocate a window of k bytes (one byte per row).
window := make([]byte, k)
// Separate out the pairs that belong in the window.
// Pairs with a column index (low 6 bits) less than 8 are moved into the window.
nextTruePair := 0
for i := 0; i < numPairs; i++ {
rowCol := pairs[i]
if rowCol == -1 {
return fmt.Errorf("uncompressHybridFlavor: invalid pair value -1 at index %d", i)
}
col := rowCol & 63
if col < 8 {
row := rowCol >> 6
window[row] |= 1 << col // set the corresponding bit in the window
} else {
// Move the "true" pair down into the pairs array.
pairs[nextTruePair] = rowCol
nextTruePair++
}
}
// The compressed state's window offset should be 0.
if c.getWindowOffset() != 0 {
return fmt.Errorf("uncompressHybridFlavor: expected windowOffset to be 0, got %d", c.getWindowOffset())
}
// Set the target sketch's windowOffset to 0.
src.windowOffset = 0
// Build a new pair table from the true pairs.
table, err := newInstanceFromPairsArray(pairs, nextTruePair, srcLgK)
if err != nil {
return err
}
src.pairTable = table
// Set the sliding window in the target sketch.
src.slidingWindow = window
return nil
}
func (c *CpcCompressedState) compressPinnedFlavor(src *CpcSketch) error {
// Compress the window portion.
if err := c.compressTheWindow(src); err != nil {
return err
}
srcPairTable := src.pairTable
numPairs := srcPairTable.numPairs
if numPairs > 0 {
pairs, err := srcPairTable.unwrap(numPairs)
if err != nil {
return err
}
// Subtract 8 from the column indices (stored in the low 6 bits).
for i := 0; i < numPairs; i++ {
// Ensure that the column (pairs[i] & 63) is at least 8.
if (pairs[i] & 63) < 8 {
return fmt.Errorf("compressPinnedFlavor: pair %d has column index less than 8", pairs[i])
}
pairs[i] -= 8
}
introspectiveInsertionSort(pairs, 0, numPairs-1)
return compressTheSurprisingValues(c, src, pairs, numPairs)
}
return nil
}
func (c *CpcCompressedState) uncompressPinnedFlavor(src *CpcSketch) error {
// The pinned flavor must have a non-nil cwStream.
if c.CwStream == nil {
return fmt.Errorf("uncompressPinnedFlavor: expected cwStream to be non-nil")
}
// Uncompress the window portion into the target sketch.
if err := uncompressTheWindow(src, c); err != nil {
return err
}
srcLgK := c.LgK
numPairs := int(c.NumCsv)
if numPairs == 0 {
// If there are no pairs, create an empty pair table.
pt, err := NewPairTable(2, 6+srcLgK)
if err != nil {
return err
}
src.pairTable = pt
} else {
// For pinned flavor, csvStream must be non-nil.
if c.CsvStream == nil {
return fmt.Errorf("uncompressPinnedFlavor: expected csvStream to be non-nil")
}
// Uncompress the surprising values.
pairs, err := uncompressTheSurprisingValues(c)
if err != nil {
return err
}
// Undo the compressor's 8-column shift:
// For each pair, the lower 6 bits (the column) must be less than 56.
// Then add 8 back.
for i := 0; i < numPairs; i++ {
if (pairs[i] & 63) >= 56 {
return fmt.Errorf("uncompressPinnedFlavor: invalid pair value %d at index %d", pairs[i], i)
}
pairs[i] += 8
}
// Create a new pair table from the corrected pairs array.
table, err := newInstanceFromPairsArray(pairs, numPairs, srcLgK)
if err != nil {
return err
}
src.pairTable = table
}
return nil
}
func (c *CpcCompressedState) compressSlidingFlavor(src *CpcSketch) error {
// First, compress the window.
if err := c.compressTheWindow(src); err != nil {
return err
}
srcPairTable := src.pairTable
numPairs := srcPairTable.numPairs
if numPairs > 0 {
pairs, err := srcPairTable.unwrap(numPairs)
if err != nil {
return err
}
// Apply a transformation to the column indices.
pseudoPhase := determinePseudoPhase(src.lgK, int64(src.numCoupons))
if pseudoPhase >= 16 {
return fmt.Errorf("compressSlidingFlavor: pseudoPhase (%d) >= 16", pseudoPhase)
}
permutation := columnPermutationsForEncoding[pseudoPhase]
offset := src.windowOffset
if offset <= 0 || offset > 56 {
return fmt.Errorf("compressSlidingFlavor: invalid windowOffset %d", offset)
}
for i := 0; i < numPairs; i++ {
rowCol := pairs[i]
row := rowCol >> 6
col := rowCol & 63
// Rotate the columns into canonical configuration:
// new = ((old - (offset+8)) + 64) mod 64,
// which simplifies here to:
col = ((col + 56) - offset) & 63
if col < 0 || col >= 56 {
return fmt.Errorf("compressSlidingFlavor: transformed column %d out of range", col)
}
// Then apply the permutation.
col = int(permutation[col])
pairs[i] = (row << 6) | col
}
introspectiveInsertionSort(pairs, 0, numPairs-1)
return compressTheSurprisingValues(c, src, pairs, numPairs)
}
return nil
}
func (c *CpcCompressedState) uncompressSlidingFlavor(src *CpcSketch) error {
// Ensure that cwStream is not nil.
if c.CwStream == nil {
return fmt.Errorf("uncompressSlidingFlavor: expected cwStream to be non-nil")
}
// Uncompress the window portion.
if err := uncompressTheWindow(src, c); err != nil {
return err
}
srcLgK := c.LgK
numPairs := int(c.NumCsv)
if numPairs == 0 {
// Create an empty pair table.
pt, err := NewPairTable(2, 6+srcLgK)
if err != nil {
return err
}
src.pairTable = pt
} else {
// Ensure csvStream is present.
if c.CsvStream == nil {
return fmt.Errorf("uncompressSlidingFlavor: expected csvStream to be non-nil")
}
// Uncompress the surprising values.
pairs, err := uncompressTheSurprisingValues(c)
if err != nil {
return err
}
// Determine pseudoPhase.
pseudoPhase := determinePseudoPhase(srcLgK, int64(c.NumCoupons))
if pseudoPhase >= 16 {
return fmt.Errorf("uncompressSlidingFlavor: pseudoPhase %d out of range", pseudoPhase)
}
permutation := columnPermutationsForDecoding[pseudoPhase]
// Get the window offset; it must be in (0, 56].
offset := c.getWindowOffset()
if offset <= 0 || offset > 56 {
return fmt.Errorf("uncompressSlidingFlavor: invalid window offset %d", offset)
}
// For each pair, undo the permutation and rotation.
for i := 0; i < numPairs; i++ {
rowCol := pairs[i]
row := rowCol >> 6
col := rowCol & 63
// First, undo the permutation.
col = int(permutation[col])
// Then, undo the rotation: old = (new + (offset+8)) mod 64.
col = (col + (offset + 8)) & 63
pairs[i] = (row << 6) | col
}
// Create a new pair table from the adjusted pairs.
table, err := newInstanceFromPairsArray(pairs, numPairs, srcLgK)
if err != nil {
return err
}
src.pairTable = table
}
return nil
}
func importFromMemory(bytes []byte) (*CpcCompressedState, error) {
if err := checkLoPreamble(bytes); err != nil {
return nil, err
}
if !isCompressed(bytes) {
return nil, fmt.Errorf("not compressed")
}
lgK := getLgK(bytes)
seedHash := getSeedHash(bytes)
state := NewCpcCompressedState(lgK, seedHash)
fmtOrd := getFormatOrdinal(bytes)
format := CpcFormat(fmtOrd)
state.MergeFlag = (fmtOrd & 1) == 0
state.CsvIsValid = (fmtOrd & 2) > 0
state.WindowIsValid = (fmtOrd & 4) > 0
switch format {
case CpcFormatEmptyMerged, CpcFormatEmptyHip:
if err := checkCapacity(len(bytes), 8); err != nil {
return nil, err
}
case CpcFormatSparseHybridMerged:
state.NumCoupons = getNumCoupons(bytes)
state.NumCsv = state.NumCoupons
state.CsvLengthInts = getSvLengthInts(bytes)
if err := checkCapacity(len(bytes), state.getRequiredSerializedBytes()); err != nil {
return nil, err
}
state.CsvStream = getSvStream(bytes)
case CpcFormatSparseHybridHip:
state.NumCoupons = getNumCoupons(bytes)
state.NumCsv = state.NumCoupons
state.CsvLengthInts = getSvLengthInts(bytes)
state.Kxp = getKxP(bytes)
state.HipEstAccum = getHipAccum(bytes)
if err := checkCapacity(len(bytes), state.getRequiredSerializedBytes()); err != nil {
return nil, err
}
state.CsvStream = getSvStream(bytes)
case CpcFormatPinnedSlidingMergedNosv:
state.FiCol = getFiCol(bytes)
state.NumCoupons = getNumCoupons(bytes)
state.CwLengthInts = getWLengthInts(bytes)
if err := checkCapacity(len(bytes), state.getRequiredSerializedBytes()); err != nil {
return nil, err
}
state.CwStream = getWStream(bytes)
case CpcFormatPinnedSlidingHipNosv:
state.FiCol = getFiCol(bytes)
state.NumCoupons = getNumCoupons(bytes)
state.CwLengthInts = getWLengthInts(bytes)
state.Kxp = getKxP(bytes)
state.HipEstAccum = getHipAccum(bytes)
if err := checkCapacity(len(bytes), state.getRequiredSerializedBytes()); err != nil {
return nil, err
}
state.CwStream = getWStream(bytes)
case CpcFormatPinnedSlidingMerged:
state.FiCol = getFiCol(bytes)
state.NumCoupons = getNumCoupons(bytes)
state.NumCsv = getNumSV(bytes)
state.CsvLengthInts = getSvLengthInts(bytes)
state.CwLengthInts = getWLengthInts(bytes)
if err := checkCapacity(len(bytes), state.getRequiredSerializedBytes()); err != nil {
return nil, err
}
state.CwStream = getWStream(bytes)
state.CsvStream = getSvStream(bytes)
case CpcFormatPinnedSlidingHip:
state.FiCol = getFiCol(bytes)
state.NumCoupons = getNumCoupons(bytes)
state.NumCsv = getNumSV(bytes)
state.CsvLengthInts = getSvLengthInts(bytes)
state.CwLengthInts = getWLengthInts(bytes)
state.Kxp = getKxP(bytes)
state.HipEstAccum = getHipAccum(bytes)
if err := checkCapacity(len(bytes), state.getRequiredSerializedBytes()); err != nil {
return nil, err
}
state.CwStream = getWStream(bytes)
state.CsvStream = getSvStream(bytes)
default:
panic("not implemented")
}
return state, nil
}
func (c *CpcCompressedState) exportToMemory() ([]byte, error) {
// Determine the total number of bytes required.
totalBytes := c.getRequiredSerializedBytes()
// Allocate a byte slice (zero-filled by default).
mem := make([]byte, totalBytes)
// Determine the format of the state.
format := c.getFormat()
switch format {
case CpcFormatEmptyMerged:
if err := putEmptyMerged(mem, c.LgK, c.SeedHash); err != nil {
return nil, err
}
case CpcFormatEmptyHip:
if err := putEmptyHip(mem, c.LgK, c.SeedHash); err != nil {
return nil, err
}
case CpcFormatSparseHybridMerged:
if err := putSparseHybridMerged(mem, c.LgK, int(c.NumCoupons), c.CsvLengthInts, c.SeedHash, c.CsvStream); err != nil {
return nil, err
}
case CpcFormatSparseHybridHip:
if err := putSparseHybridHip(mem, c.LgK, int(c.NumCoupons), c.CsvLengthInts, c.Kxp, c.HipEstAccum, c.SeedHash, c.CsvStream); err != nil {
return nil, err
}
case CpcFormatPinnedSlidingMergedNosv:
if err := putPinnedSlidingMergedNoSv(mem, c.LgK, c.FiCol, int(c.NumCoupons), c.CwLengthInts, c.SeedHash, c.CwStream); err != nil {
return nil, err
}
case CpcFormatPinnedSlidingHipNosv:
if err := putPinnedSlidingHipNoSv(mem, c.LgK, c.FiCol, int(c.NumCoupons), c.CwLengthInts, c.Kxp, c.HipEstAccum, c.SeedHash, c.CwStream); err != nil {
return nil, err
}
case CpcFormatPinnedSlidingMerged:
if err := putPinnedSlidingMerged(mem, c.LgK, c.FiCol, int(c.NumCoupons), int(c.NumCsv), c.CsvLengthInts, c.CwLengthInts, c.SeedHash, c.CsvStream, c.CwStream); err != nil {
return nil, err
}
case CpcFormatPinnedSlidingHip:
if err := putPinnedSlidingHip(mem, c.LgK, c.FiCol, int(c.NumCoupons), int(c.NumCsv), c.Kxp, c.HipEstAccum, c.CsvLengthInts, c.CwLengthInts, c.SeedHash, c.CsvStream, c.CwStream); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("exportToMemory: format %v not implemented", format)
}
if err := checkCapacity(len(mem), totalBytes); err != nil {
return nil, err
}
return mem, nil
}
func compressTheSurprisingValues(target *CpcCompressedState, source *CpcSketch, pairs []int, numPairs int) error {
if numPairs <= 0 {
return fmt.Errorf("compressTheSurprisingValues: numPairs must be > 0, got %d", numPairs)
}
// Set the number of CSV values.
target.NumCsv = uint64(numPairs)
// Compute srcK = 1 << source.lgK.
srcK := 1 << source.lgK
// Determine the number of base bits using a Golomb code decision.
numBaseBits := golombChooseNumberOfBaseBits(srcK+numPairs, numPairs)
// Compute an upper-bound length for the compressed pairs buffer.
pairBufLen := safeLengthForCompressedPairBuf(srcK, numPairs, numBaseBits)
// Allocate the buffer for compression.
pairBuf := make([]int, pairBufLen)
// lowLevelCompressPairs compresses 'pairs' using the chosen base bits into pairBuf.
// It returns the number of ints that represent the compressed data.
csvLength := lowLevelCompressPairs(pairs, numPairs, numBaseBits, pairBuf)
target.CsvLengthInts = csvLength
target.CsvStream = pairBuf
return nil
}
func uncompressTheSurprisingValues(source *CpcCompressedState) ([]int, error) {
srcK := 1 << source.LgK
numPairs := int(source.NumCsv)
if numPairs <= 0 {
return nil, fmt.Errorf("uncompressTheSurprisingValues: numPairs must be > 0, got %d", numPairs)
}
pairs := make([]int, numPairs)
// Determine the number of base bits using the Golomb code decision.
numBaseBits := golombChooseNumberOfBaseBits(srcK+numPairs, numPairs)
// lowLevelUncompressPairs fills the 'pairs' slice using the compressed CSV stream.
if err := lowLevelUncompressPairs(pairs, numPairs, numBaseBits, source.CsvStream, source.CsvLengthInts); err != nil {
return nil, err
}
return pairs, nil
}
func golombChooseNumberOfBaseBits(k, count int) int {
if k < 1 || count < 1 {
panic("golombChooseNumberOfBaseBits: k and count must be >= 1")
}
quotient := (k - count) / count
if quotient == 0 {
return 0
}
return floorLog2(uint64(quotient))
}
func floorLog2(x uint64) int {
return bits.Len64(x) - 1
}
func safeLengthForCompressedPairBuf(k, numPairs, numBaseBits int) int {
if numPairs <= 0 {
panic("safeLengthForCompressedPairBuf: numPairs must be > 0")
}
// Compute ybits = (numPairs * (1 + numBaseBits)) + (k >>> numBaseBits)
ybits := int64(numPairs)*(1+int64(numBaseBits)) + (int64(k) >> uint(numBaseBits))
xbits := int64(12 * numPairs)
padding := int64(10 - numBaseBits)
if padding < 0 {
padding = 0
}
totalBits := xbits + ybits + padding
// Divide by 32 rounding up to get a word count.
words := divideBy32RoundingUp(totalBits)
// Ensure the number of words fits in a 31-bit int.
if words >= (1 << 31) {
panic("safeLengthForCompressedPairBuf: words too large")
}
return int(words)
}
func divideBy32RoundingUp(x int64) int64 {
tmp := x >> 5 // equivalent to dividing by 32
if tmp<<5 == x {
return tmp
}
return tmp + 1
}
func lowLevelCompressPairs(pairArray []int, numPairsToEncode, numBaseBits int, compressedWords []int) int {
nextWordIndex := 0
var bitBuf uint64 = 0
bufBits := 0
// Allocate the pointer array (used for writeUnary).
ptrArr := make([]int64, 3)
// golombLoMask = (1L << numBaseBits) - 1
golombLoMask := (uint64(1) << uint(numBaseBits)) - 1
predictedRowIndex := 0
predictedColIndex := 0
for pairIndex := 0; pairIndex < numPairsToEncode; pairIndex++ {
rowCol := pairArray[pairIndex]
// Extract row index (upper bits) and column index (lower 6 bits)
rowIndex := rowCol >> 6
colIndex := rowCol & 0x3F // 0x3F == 63
if rowIndex != predictedRowIndex {
predictedColIndex = 0
}
if rowIndex < predictedRowIndex || colIndex < predictedColIndex {
panic(fmt.Sprintf("lowLevelCompressPairs: assertion failed: rowIndex=%d, predictedRowIndex=%d, colIndex=%d, predictedColIndex=%d",
rowIndex, predictedRowIndex, colIndex, predictedColIndex))
}
// yDelta is the difference in row indices.
yDelta := uint64(rowIndex - predictedRowIndex)
// xDelta is the difference in column indices.
xDelta := colIndex - predictedColIndex
predictedRowIndex = rowIndex
predictedColIndex = colIndex + 1
// Retrieve the code information from the lookup table.
codeInfo := uint64(lengthLimitedUnaryEncodingTable65[xDelta]) & 0xFFFF
// Lower 12 bits are the code value.
codeVal := codeInfo & 0xFFF
// Upper bits (shifted right 12) are the code length.
codeLen := int(codeInfo >> 12)
// Append the code value into the bit buffer.
bitBuf |= codeVal << uint(bufBits)
bufBits += codeLen
// Flush the bit buffer if we have 32 or more bits.
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
// Process Golomb coding for yDelta.
golombLo := yDelta & golombLoMask
golombHi := yDelta >> uint(numBaseBits)
// Inline WriteUnary:
ptrArr[NextWordIdx] = int64(nextWordIndex)
ptrArr[BitBuf] = int64(bitBuf)
ptrArr[BufBits] = int64(bufBits)
// Call writeUnary to output unary code for golombHi.
writeUnary(compressedWords, ptrArr, int(golombHi))
// Retrieve updated values.
nextWordIndex = int(ptrArr[NextWordIdx])
bitBuf = uint64(ptrArr[BitBuf])
bufBits = int(ptrArr[BufBits])
// Append the lower bits of the Golomb code.
bitBuf |= golombLo << uint(bufBits)
bufBits += numBaseBits
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
}
// Pad the bitstream so that the decompressor's 12-bit peek can't overrun its input.
padding := 10 - numBaseBits
if padding < 0 {
padding = 0
}
bufBits += padding
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
if bufBits > 0 {
// Flush any remaining bits.
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
}
return nextWordIndex
}
func lowLevelUncompressPairs(pairArray []int, numPairsToDecode, numBaseBits int, compressedWords []int, numCompressedWords int) error {
// Output index for pairArray.
pairIndex := 0
ptrArr := make([]int64, 3)
nextWordIndex := 0
var bitBuf uint64 = 0
bufBits := 0
// golombLoMask = (1 << numBaseBits) - 1
golombLoMask := (uint64(1) << uint(numBaseBits)) - 1
predictedRowIndex := 0
predictedColIndex := 0
// For each pair to decode:
for pairIndex < numPairsToDecode {
// Ensure we have at least 12 bits in bitBuf.
if bufBits < 12 {
if nextWordIndex >= len(compressedWords) {
return fmt.Errorf("lowLevelUncompressPairs: insufficient compressedWords data")
}
bitBuf |= (uint64(compressedWords[nextWordIndex]) & 0xFFFFFFFF) << uint(bufBits)
nextWordIndex++
bufBits += 32
}
// Peek 12 bits.
peek12 := int(bitBuf & 0xFFF) // 0xFFF is 12 bits.
lookup := int(lengthLimitedUnaryDecodingTable65[peek12]) & 0xFFFF
codeWordLength := lookup >> 8
xDelta := lookup & 0xFF
// Consume the xDelta bits.
bitBuf >>= uint(codeWordLength)
bufBits -= codeWordLength
// Inline ReadUnary:
ptrArr[NextWordIdx] = int64(nextWordIndex)
ptrArr[BitBuf] = int64(bitBuf)
ptrArr[BufBits] = int64(bufBits)
golombHi := readUnary(compressedWords, ptrArr)
// Retrieve updated values.
nextWordIndex = int(ptrArr[NextWordIdx])
bitBuf = uint64(ptrArr[BitBuf])
bufBits = int(ptrArr[BufBits])
// Ensure at least numBaseBits in bitBuf.
if bufBits < numBaseBits {
if nextWordIndex >= len(compressedWords) {
return fmt.Errorf("lowLevelUncompressPairs: insufficient compressedWords data for golombLo")
}
bitBuf |= (uint64(compressedWords[nextWordIndex]) & 0xFFFFFFFF) << uint(bufBits)
nextWordIndex++
bufBits += 32
}
golombLo := bitBuf & golombLoMask
bitBuf >>= uint(numBaseBits)
bufBits -= numBaseBits
// yDelta is the combination of the unary high and the base bits.
yDelta := (uint64(golombHi) << uint(numBaseBits)) | golombLo
// Now compute the pair's row and column.
if yDelta > 0 {
predictedColIndex = 0
}
rowIndex := predictedRowIndex + int(yDelta)
colIndex := predictedColIndex + xDelta
rowCol := (rowIndex << 6) | colIndex
pairArray[pairIndex] = rowCol
pairIndex++
predictedRowIndex = rowIndex
predictedColIndex = colIndex + 1
}
if nextWordIndex > numCompressedWords {
return fmt.Errorf("lowLevelUncompressPairs: nextWordIndex %d exceeds numCompressedWords %d", nextWordIndex, numCompressedWords)
}
return nil
}
func readUnary(compressedWords []int, ptrArr []int64) int64 {
nextWordIndex := int(ptrArr[NextWordIdx])
bitBuf := uint64(ptrArr[BitBuf])
bufBits := int(ptrArr[BufBits])
var subTotal int64 = 0
var trailingZeros int
// Loop until we get a byte that doesn't have all 8 zeros.
for {
// Ensure we have at least 8 bits in the bit buffer.
if bufBits < 8 {
if nextWordIndex >= len(compressedWords) {
panic("readUnary: insufficient compressedWords data")
}
bitBuf |= (uint64(compressedWords[nextWordIndex]) & 0xFFFFFFFF) << uint(bufBits)
nextWordIndex++
bufBits += 32
}
// Peek at the lowest 8 bits.
peek8 := int(bitBuf & 0xFF)
// Compute the number of trailing zeros in these 8 bits.
// bits.TrailingZeros8 returns a value between 0 and 8.
trailingZeros = bits.TrailingZeros8(uint8(peek8))
// If all 8 bits are zeros, the codeword is partial; add 8 to subTotal and consume 8 bits.
if trailingZeros == 8 {
subTotal += 8
bufBits -= 8
bitBuf >>= 8
continue
}
break
}
// Consume the terminating one and the zeros.
bufBits -= 1 + trailingZeros
bitBuf >>= uint(1 + trailingZeros)
// Update the pointer array.
ptrArr[NextWordIdx] = int64(nextWordIndex)
ptrArr[BitBuf] = int64(bitBuf)
ptrArr[BufBits] = int64(bufBits)
return subTotal + int64(trailingZeros)
}
func writeUnary(compressedWords []int, ptrArr []int64, theValue int) {
nextWordIndex := int(ptrArr[NextWordIdx])
bitBuf := uint64(ptrArr[BitBuf])
bufBits := int(ptrArr[BufBits])
remaining := theValue
// Write out groups of 16 zeros.
for remaining >= 16 {
remaining -= 16
bufBits += 16
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
}
// remaining is now between 0 and 15.
theUnaryCode := uint64(1) << uint(remaining) // a one at position 'remaining'
bitBuf |= theUnaryCode << uint(bufBits)
bufBits += 1 + remaining
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
ptrArr[NextWordIdx] = int64(nextWordIndex)
ptrArr[BitBuf] = int64(bitBuf)
ptrArr[BufBits] = int64(bufBits)
}
func trickyGetPairsFromWindow(window []byte, k, numPairsToGet, emptySpace int) []int {
outputLength := emptySpace + numPairsToGet
pairs := make([]int, outputLength)
pairIndex := emptySpace
for rowIndex := 0; rowIndex < k; rowIndex++ {
// Treat the byte as an unsigned value.
wByte := int(window[rowIndex]) & 0xFF
for wByte != 0 {
// bits.TrailingZeros8 returns the number of trailing zero bits in an uint8.
colIndex := bits.TrailingZeros8(uint8(wByte))
// Erase the found bit.
wByte ^= 1 << colIndex
// Encode the pair as (rowIndex << 6) | colIndex.
pairs[pairIndex] = (rowIndex << 6) | colIndex
pairIndex++
}
}
if pairIndex != outputLength {
panic(fmt.Sprintf("trickyGetPairsFromWindow: pairIndex (%d) != outputLength (%d)", pairIndex, outputLength))
}
return pairs
}
func (c *CpcCompressedState) compressTheWindow(src *CpcSketch) error {
// Get the source parameters.
srcLgK := src.lgK
srcK := 1 << srcLgK
// Determine the safe buffer length for compressing the window.
windowBufLen := safeLengthForCompressedWindowBuf(int64(srcK))
windowBuf := make([]int, windowBufLen)
// Determine the pseudo-phase using srcLgK and the number of coupons.
pseudoPhase := determinePseudoPhase(srcLgK, int64(src.numCoupons))
// Compress the sliding window bytes.
// lowLevelCompressBytes is assumed to return (cwLengthInts int, err error).
cwLengthInts := lowLevelCompressBytes(src.slidingWindow, srcK, encodingTablesForHighEntropyByte[pseudoPhase], windowBuf)
// Store the results into the compressed state.
c.CwLengthInts = cwLengthInts
c.CwStream = windowBuf
return nil
}
func uncompressTheWindow(target *CpcSketch, source *CpcCompressedState) error {
srcLgK := source.LgK
srcK := 1 << srcLgK
// Allocate a byte slice of length srcK (zeroed by default).
window := make([]byte, srcK)
// Ensure that target.slidingWindow is nil.
if target.slidingWindow != nil {
return fmt.Errorf("uncompressTheWindow: target.slidingWindow is already set")
}
target.slidingWindow = window
// Determine the pseudo-phase using srcLgK and source.NumCoupons.
pseudoPhase := determinePseudoPhase(srcLgK, int64(source.NumCoupons))
// Ensure that source.CwStream is not nil.
if source.CwStream == nil {
return fmt.Errorf("uncompressTheWindow: source.CwStream is nil")
}
// Uncompress the window bytes into target.slidingWindow.
return lowLevelUncompressBytes(target.slidingWindow, srcK,
decodingTablesForHighEntropyByte[pseudoPhase],
source.CwStream,
source.CwLengthInts)
}
// safeLengthForCompressedWindowBuf computes the safe buffer length (in 32‐bit words)
// for compressing the window, given k (typically 1 << lgK).
func safeLengthForCompressedWindowBuf(k int64) int {
// Compute total total_bits = (12 * k) + 11 (i.e. 12 total_bits per row plus 11 total_bits of padding).
totalBits := (12 * k) + 11
// Divide by 32 rounding up.
return int(divideBy32RoundingUp(totalBits))
}
func determinePseudoPhase(lgK int, numCoupons int64) int {
k := int64(1) << uint(lgK)
c := numCoupons
// Midrange logic.
if (1000 * c) < (2375 * k) {
if (4 * c) < (3 * k) {
return 16 + 0
} else if (10 * c) < (11 * k) {
return 16 + 1
} else if (100 * c) < (132 * k) {
return 16 + 2
} else if (3 * c) < (5 * k) {
return 16 + 3
} else if (1000 * c) < (1965 * k) {
return 16 + 4
} else if (1000 * c) < (2275 * k) {
return 16 + 5
} else {
return 6 // steady-state table employed before its actual phase.
}
} else {
// Steady-state logic.
if lgK < 4 {
panic("determinePseudoPhase: lgK must be at least 4")
}
tmp := c >> uint(lgK-4)
phase := int(tmp & 15)
if phase < 0 || phase >= 16 {
panic(fmt.Sprintf("determinePseudoPhase: phase out of range: %d", phase))
}
return phase
}
}
func lowLevelCompressBytes(byteArray []byte, numBytesToEncode int, encodingTable []uint16, compressedWords []int) int {
nextWordIndex := 0
var bitBuf uint64 = 0 // accumulator for bits
bufBits := 0 // number of bits currently in bitBuf
for byteIndex := 0; byteIndex < numBytesToEncode; byteIndex++ {
// Get the byte as an unsigned value.
theByte := int(byteArray[byteIndex]) & 0xFF
codeInfo := uint64(encodingTable[theByte]) & 0xFFFF
// Lower 12 bits are the code value.
codeVal := codeInfo & 0xFFF
// Upper bits (after shifting right by 12) give the code word length.
codeWordLength := int(codeInfo >> 12)
// Append the code value into bitBuf.
bitBuf |= codeVal << uint(bufBits)
bufBits += codeWordLength
// Flush complete 32-bit words.
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
}
// Pad with 11 zero-bits so that the decompressor's 12-bit peek cannot overrun.
bufBits += 11
if bufBits >= 32 {
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
bitBuf >>= 32
bufBits -= 32
}
// Flush any remaining bits.
if bufBits > 0 {
// bufBits is guaranteed to be less than 32.
compressedWords[nextWordIndex] = int(bitBuf & 0xFFFFFFFF)
nextWordIndex++
}
return nextWordIndex
}
func lowLevelUncompressBytes(byteArray []byte, numBytesToDecode int, decodingTable []uint16, compressedWords []int, numCompressedWords int) error {
// Precondition checks.
if byteArray == nil {
return fmt.Errorf("lowLevelUncompressBytes: byteArray is nil")
}
if decodingTable == nil {
return fmt.Errorf("lowLevelUncompressBytes: decodingTable is nil")
}
if compressedWords == nil {
return fmt.Errorf("lowLevelUncompressBytes: compressedWords is nil")
}
byteIndex := 0
nextWordIndex := 0
var bitBuf uint64 = 0
bufBits := 0
// Loop for each output byte.
for byteIndex < numBytesToDecode {
// Ensure there are at least 12 bits in bitBuf.
if bufBits < 12 {
if nextWordIndex >= len(compressedWords) {
return fmt.Errorf("lowLevelUncompressBytes: insufficient compressedWords data")
}
// Append next 32 bits from compressedWords.
bitBuf |= (uint64(compressedWords[nextWordIndex]) & 0xFFFFFFFF) << uint(bufBits)
nextWordIndex++
bufBits += 32
}
// Peek 12 bits.
peek12 := int(bitBuf & 0xFFF) // 0xFFF == 12 bits.
lookup := int(decodingTable[peek12]) & 0xFFFF
codeWordLength := lookup >> 8
decodedByte := byte(lookup & 0xFF)
byteArray[byteIndex] = decodedByte
byteIndex++
// Consume the codeword bits.
bitBuf >>= uint(codeWordLength)
bufBits -= codeWordLength
}
// Check that we did not over-run the compressedWords array.
if nextWordIndex > numCompressedWords {
return fmt.Errorf("lowLevelUncompressBytes: nextWordIndex (%d) exceeds expected (%d)", nextWordIndex, numCompressedWords)
}
return nil
}