frequencies/reverse_purge_item_hash_map.go (256 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package frequencies
import (
"fmt"
"github.com/apache/datasketches-go/common"
"github.com/apache/datasketches-go/internal"
"math/bits"
"strings"
)
type reversePurgeItemHashMap[C comparable] struct {
lgLength int
loadThreshold int
keys []C
values []int64
states []int16
numActive int
hasher common.ItemSketchHasher[C]
serde common.ItemSketchSerde[C]
}
type iteratorItemHashMap[C comparable] struct {
keys_ []C
values_ []int64
states_ []int16
numActive_ int
stride_ int
mask_ int
i_ int
count_ int
}
const (
reversePurgeItemHashMapLoadFactor = float64(0.75)
)
// newReversePurgeItemHashMap will create arrays of length mapSize, which must be a power of two.
// This restriction was made to ensure fast hashing.
// The variable this.loadThreshold is then set to the largest value that
// will not overload the hashFn table.
//
// - mapSize, This determines the number of cells in the arrays underlying the
// HashMap implementation and must be a power of 2.
// The hashFn table will be expected to store reversePurgeItemHashMapLoadFactor * mapSize (key, value) pairs.
func newReversePurgeItemHashMap[C comparable](mapSize int, hasher common.ItemSketchHasher[C], serde common.ItemSketchSerde[C]) (*reversePurgeItemHashMap[C], error) {
lgLength, err := internal.ExactLog2(mapSize)
if err != nil {
return nil, err
}
return &reversePurgeItemHashMap[C]{
lgLength,
int(float64(mapSize) * reversePurgeItemHashMapLoadFactor),
make([]C, mapSize),
make([]int64, mapSize),
make([]int16, mapSize),
0,
hasher,
serde,
}, nil
}
func (r *reversePurgeItemHashMap[C]) get(key C) (int64, error) {
if internal.IsNil(key) {
return 0, nil
}
probe := r.hashProbe(key)
if r.states[probe] > 0 {
if r.keys[probe] != key {
return 0, fmt.Errorf("key not found")
}
return r.values[probe], nil
}
return 0, nil
}
func (r *reversePurgeItemHashMap[C]) getCapacity() int {
return r.loadThreshold
}
// adjustOrPutValue adjusts the value associated with the given key.
// Increments the value mapped to the key if the key is present in the map. Otherwise,
// the key is inserted with the putAmount.
//
// key the key of the value to increment
// adjustAmount the amount by which to increment the value
func (r *reversePurgeItemHashMap[C]) adjustOrPutValue(key C, adjustAmount int64) error {
var (
arrayMask = len(r.keys) - 1
probe = r.hasher.Hash(key) & uint64(arrayMask)
drift = 1
)
for r.states[probe] != 0 && r.keys[probe] != key {
probe = (probe + 1) & uint64(arrayMask)
drift++
//only used for theoretical analysis
//assert drift < DRIFT_LIMIT : "drift: " + drift + " >= DRIFT_LIMIT";
}
if r.states[probe] == 0 {
// adding the key to the table the value
if r.numActive > r.loadThreshold {
return fmt.Errorf("numActive: %d >= loadThreshold: %d", r.numActive, r.loadThreshold)
}
r.keys[probe] = key
r.values[probe] = adjustAmount
r.states[probe] = int16(drift)
r.numActive++
} else {
// adjusting the value of an existing key
if r.keys[probe] != key {
return fmt.Errorf("key not found")
}
r.values[probe] += adjustAmount
}
return nil
}
func (r *reversePurgeItemHashMap[C]) resize(newSize int) error {
oldKeys := r.keys
oldValues := r.values
oldStates := r.states
r.keys = make([]C, newSize)
r.values = make([]int64, newSize)
r.states = make([]int16, newSize)
r.loadThreshold = int(float64(newSize) * reversePurgeItemHashMapLoadFactor)
r.lgLength = bits.TrailingZeros64(uint64(newSize))
r.numActive = 0
for i := 0; i < len(oldKeys); i++ {
if oldStates[i] > 0 {
err := r.adjustOrPutValue(oldKeys[i], oldValues[i])
if err != nil {
return err
}
}
}
return nil
}
func (r *reversePurgeItemHashMap[C]) purge(sampleSize int) int64 {
limit := min(sampleSize, r.numActive)
numSamples := 0
i := 0
samples := make([]int64, limit)
for numSamples < limit {
if r.states[i] > 0 { //isActive
samples[numSamples] = r.values[i]
numSamples++
}
i++
}
val := internal.QuickSelect(samples, 0, numSamples-1, limit/2)
r.adjustAllValuesBy(-1 * val)
r.keepOnlyPositiveCounts()
return val
}
func (r *reversePurgeItemHashMap[C]) serializeToString() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%d,%d,", r.numActive, len(r.keys)))
for i := 0; i < len(r.keys); i++ {
if r.states[i] != 0 {
sb.WriteString(fmt.Sprintf("%v,%d,", r.keys[i], r.values[i]))
}
}
return sb.String()
}
// adjustAllValuesBy adjust amount value by which to shift all values. Only keys corresponding to positive
// values are retained.
func (r *reversePurgeItemHashMap[C]) adjustAllValuesBy(adjustAmount int64) {
for i := len(r.values); i > 0; {
i--
r.values[i] += adjustAmount
}
}
func (r *reversePurgeItemHashMap[C]) keepOnlyPositiveCounts() {
// Starting from the back, find the first empty cell,
// which establishes the high end of a cluster.
firstProbe := len(r.states) - 1
for r.states[firstProbe] > 0 {
firstProbe--
}
// firstProbe keeps track of this point.
// When we find the next non-empty cell, we know we are at the high end of a cluster
// Work towards the front; delete any non-positive entries.
for probe := firstProbe; probe > 0; {
probe--
if r.states[probe] > 0 && r.values[probe] <= 0 {
r.hashDelete(probe) //does the work of deletion and moving higher items towards the front.
r.numActive--
}
}
// now work on the first cluster that was skipped.
for probe := len(r.states); probe > firstProbe; {
probe--
if r.states[probe] > 0 && r.values[probe] <= 0 {
r.hashDelete(probe)
r.numActive--
}
}
}
func (r *reversePurgeItemHashMap[C]) hashDelete(deleteProbe int) {
// Looks ahead in the table to search for another
// item to move to this location
// if none are found, the status is changed
r.states[deleteProbe] = 0 //mark as empty
drift := 1
arrayMask := len(r.keys) - 1
probe := (deleteProbe + drift) & arrayMask //map length must be a power of 2
// advance until you find a free location replacing locations as needed
for r.states[probe] != 0 {
if r.states[probe] > int16(drift) {
// move current element
r.keys[deleteProbe] = r.keys[probe]
r.values[deleteProbe] = r.values[probe]
r.states[deleteProbe] = r.states[probe] - int16(drift)
// marking this location as deleted
r.states[probe] = 0
drift = 0
deleteProbe = probe
}
probe = (probe + 1) & arrayMask
drift++
//only used for theoretical analysis
//assert drift < DRIFT_LIMIT : "drift: " + drift + " >= DRIFT_LIMIT";
}
}
func (r *reversePurgeItemHashMap[C]) getActiveValues() []int64 {
if r.numActive == 0 {
return nil
}
returnValues := make([]int64, 0, r.numActive)
for i := 0; i < len(r.values); i++ {
if r.states[i] > 0 { //isActive
returnValues = append(returnValues, r.values[i])
}
}
return returnValues
}
func (r *reversePurgeItemHashMap[C]) getActiveKeys() []C {
if r.numActive == 0 {
return nil
}
returnKeys := make([]C, 0, r.numActive)
for i := 0; i < len(r.keys); i++ {
if r.states[i] > 0 { //isActive
returnKeys = append(returnKeys, r.keys[i])
}
}
return returnKeys
}
func (r *reversePurgeItemHashMap[C]) iterator() *iteratorItemHashMap[C] {
return newIteratorItems(r.keys, r.values, r.states, r.numActive)
}
func (r *reversePurgeItemHashMap[C]) hashProbe(key C) int {
arrayMask := uint64(len(r.keys) - 1)
probe := r.hasher.Hash(key) & arrayMask
for r.states[probe] > 0 && r.keys[probe] != key {
probe = (probe + 1) & arrayMask
}
return int(probe)
}
func (s *reversePurgeItemHashMap[C]) String() string {
var sb strings.Builder
sb.WriteString("ReversePurgeItemHashMap:\n")
sb.WriteString(fmt.Sprintf(" %12s:%11s%20s %s\n", "Index", "States", "Values", "Keys"))
for i := 0; i < len(s.keys); i++ {
if s.states[i] <= 0 {
continue
}
sb.WriteString(fmt.Sprintf(" %12d:%11d%20d %v\n", i, s.states[i], s.values[i], s.keys[i]))
}
return sb.String()
}
func newIteratorItems[C comparable](keys []C, values []int64, states []int16, numActive int) *iteratorItemHashMap[C] {
stride := int(uint64(float64(len(keys))*internal.InverseGolden) | 1)
return &iteratorItemHashMap[C]{
keys_: keys,
values_: values,
states_: states,
numActive_: numActive,
stride_: stride,
mask_: len(keys) - 1,
i_: -stride,
}
}
func (i *iteratorItemHashMap[C]) next() bool {
i.i_ = (i.i_ + i.stride_) & i.mask_
for i.count_ < i.numActive_ {
if i.states_[i.i_] > 0 {
i.count_++
return true
}
i.i_ = (i.i_ + i.stride_) & i.mask_
}
return false
}
func (i *iteratorItemHashMap[C]) getKey() C {
return i.keys_[i.i_]
}
func (i *iteratorItemHashMap[C]) getValue() int64 {
return i.values_[i.i_]
}