frequencies/reverse_purge_long_hash_map.go (320 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package frequencies import ( "errors" "fmt" "math/bits" "strconv" "strings" "github.com/apache/datasketches-go/internal" ) type reversePurgeLongHashMap struct { lgLength int loadThreshold int keys []int64 values []int64 states []int16 numActive int } type iteratorLongHashMap struct { keys_ []int64 values_ []int64 states_ []int16 numActive_ int stride_ int mask_ int i_ int count_ int } const ( reversePurgeLongHashMapLoadFactor = float64(0.75) reversePurgeLongHashMapDriftLimit = 1024 //used only in stress testing ) // newReversePurgeLongHashMap constructs a new reversePurgeLongHashMap. // It will create arrays of length mapSize, which must be a power of two. // This restriction was made to ensure fast hashing. // The member loadThreshold is then set to the largest value that // will not overload the hashFn table. func newReversePurgeLongHashMap(mapSize int) (*reversePurgeLongHashMap, error) { lgLength, err := internal.ExactLog2(mapSize) if err != nil { return nil, fmt.Errorf("mapSize: %e", err) } loadThreshold := int(float64(mapSize) * reversePurgeLongHashMapLoadFactor) keys := make([]int64, mapSize) values := make([]int64, mapSize) states := make([]int16, mapSize) return &reversePurgeLongHashMap{ lgLength: lgLength, loadThreshold: loadThreshold, keys: keys, values: values, states: states, }, nil } func (r *reversePurgeLongHashMap) get(key int64) (int64, error) { probe := r.hashProbe(key) if r.states[probe] > 0 { if r.keys[probe] == key { return r.values[probe], nil } return 0, errors.New("key not found") } return 0, nil } // getCapacity returns the current capacity of the hashFn map (i.e., max number of keys that can be stored). func (r *reversePurgeLongHashMap) getCapacity() int { return r.loadThreshold } // adjustOrPutValue adjusts the value associated with the given key. // Increments the value mapped to the key if the key is present in the map. Otherwise, // the key is inserted with the putAmount. // // key the key of the value to increment // adjustAmount the amount by which to increment the value func (r *reversePurgeLongHashMap) adjustOrPutValue(key int64, adjustAmount int64) error { var ( arrayMask = len(r.keys) - 1 probe = hashFn(key) & int64(arrayMask) drift = 1 ) for r.states[probe] != 0 && r.keys[probe] != key { probe = (probe + 1) & int64(arrayMask) drift++ if drift >= reversePurgeLongHashMapDriftLimit { return errors.New("drift >= driftLimit") } } //found either an empty slot or the key if r.states[probe] == 0 { //found empty slot // adding the key and value to the table if r.numActive > r.loadThreshold { return errors.New("numActive >= loadThreshold") } r.keys[probe] = key r.values[probe] = adjustAmount r.states[probe] = int16(drift) //how far off we are r.numActive++ } else { //found the key, adjust the value if r.keys[probe] != key { return errors.New("keys[probe] != key") } r.values[probe] += adjustAmount } return nil } func (r *reversePurgeLongHashMap) resize(newSize int) error { oldKeys := r.keys oldValues := r.values oldStates := r.states r.keys = make([]int64, newSize) r.values = make([]int64, newSize) r.states = make([]int16, newSize) r.loadThreshold = int(float64(newSize) * reversePurgeLongHashMapLoadFactor) r.lgLength = bits.TrailingZeros(uint(newSize)) r.numActive = 0 err := error(nil) for i := 0; i < len(oldKeys) && err == nil; i++ { if oldStates[i] > 0 { err = r.adjustOrPutValue(oldKeys[i], oldValues[i]) } } return err } func (r *reversePurgeLongHashMap) purge(sampleSize int) int64 { limit := min(sampleSize, r.numActive) numSamples := 0 i := 0 samples := make([]int64, limit) for numSamples < limit { if r.states[i] > 0 { samples[numSamples] = r.values[i] numSamples++ } i++ } val := internal.QuickSelect(samples, 0, numSamples-1, limit/2) r.adjustAllValuesBy(-1 * val) r.keepOnlyPositiveCounts() return val } func (r *reversePurgeLongHashMap) serializeToString() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%d,%d,", r.numActive, len(r.keys))) for i := 0; i < len(r.keys); i++ { if r.states[i] != 0 { sb.WriteString(fmt.Sprintf("%d,%d,", r.keys[i], r.values[i])) } } return sb.String() } // adjustAllValuesBy adjust amount value by which to shift all values. Only keys corresponding to positive // values are retained. func (r *reversePurgeLongHashMap) adjustAllValuesBy(adjustAmount int64) { for i := len(r.keys); i > 0; { i-- r.values[i] += adjustAmount } } func (r *reversePurgeLongHashMap) keepOnlyPositiveCounts() { // Starting from the back, find the first empty cell, which marks a boundary between clusters. firstProbe := len(r.keys) - 1 for r.states[firstProbe] > 0 { firstProbe-- } //Work towards the front; delete any non-positive entries. for probe := firstProbe; probe > 0; { probe-- // When we find the next non-empty cell, we know we are at the high end of a cluster, // which is tracked by firstProbe. if r.states[probe] > 0 && r.values[probe] <= 0 { r.hashDelete(probe) //does the work of deletion and moving higher items towards the front. r.numActive-- } } //now work on the first cluster that was skipped. for probe := len(r.keys); probe-1 > firstProbe; { probe-- if r.states[probe] > 0 && r.values[probe] <= 0 { r.hashDelete(probe) r.numActive-- } } } func (r *reversePurgeLongHashMap) hashDelete(deleteProbe int) error { // Looks ahead in the table to search for another item to move to this location. // If none are found, the status is changed r.states[deleteProbe] = 0 //mark as empty drift := 1 arrayMask := len(r.keys) - 1 probe := (deleteProbe + drift) & arrayMask //map length must be a power of 2 // advance until you find a free location replacing locations as needed for r.states[probe] != 0 { if r.states[probe] > int16(drift) { // move current element r.keys[deleteProbe] = r.keys[probe] r.values[deleteProbe] = r.values[probe] r.states[deleteProbe] = r.states[probe] - int16(drift) // marking the current probe location as deleted r.states[probe] = 0 drift = 0 deleteProbe = probe } probe = (probe + 1) & arrayMask drift++ //only used for theoretical analysis if drift >= reversePurgeLongHashMapDriftLimit { return errors.New("drift >= driftLimit") } } return nil } func deserializeReversePurgeLongHashMapFromString(string string) (*reversePurgeLongHashMap, error) { tokens := strings.Split(string, ",") if len(tokens) < 2 { return nil, errors.New("len(tokens) < 2") } numActive, err := strconv.Atoi(tokens[0]) if err != nil { return nil, err } length, err := strconv.Atoi(tokens[1]) if err != nil { return nil, err } table, err := newReversePurgeLongHashMap(length) if err != nil { return nil, err } j := 2 for i := 0; i < numActive && err == nil; i++ { key, err := strconv.Atoi(tokens[j]) if err != nil { return nil, err } value, err := strconv.Atoi(tokens[j+1]) if err != nil { return nil, err } err = table.adjustOrPutValue(int64(key), int64(value)) if err != nil { return nil, err } j += 2 } return table, nil } func deserializeFromStringArray(tokens []string) (*reversePurgeLongHashMap, error) { ignore := strPreambleTokens numActive, _ := strconv.ParseUint(tokens[ignore], 10, 32) length, _ := strconv.ParseUint(tokens[ignore+1], 10, 32) hashMap, err := newReversePurgeLongHashMap(int(length)) if err != nil { return nil, err } j := 2 + ignore for i := 0; i < int(numActive); i++ { key, err := strconv.ParseUint(tokens[j], 10, 64) if err != nil { return nil, err } value, err := strconv.ParseUint(tokens[j+1], 10, 64) if err != nil { return nil, err } err = hashMap.adjustOrPutValue(int64(key), int64(value)) if err != nil { return nil, err } j += 2 } return hashMap, nil } func (r *reversePurgeLongHashMap) getActiveValues() []int64 { if r.numActive == 0 { return nil } returnValues := make([]int64, 0, r.numActive) for i := 0; i < len(r.values); i++ { if r.states[i] > 0 { //isActive returnValues = append(returnValues, r.values[i]) } } return returnValues } func (r *reversePurgeLongHashMap) getActiveKeys() []int64 { if r.numActive == 0 { return nil } returnValues := make([]int64, 0, r.numActive) for i := 0; i < len(r.keys); i++ { if r.states[i] > 0 { //isActive returnValues = append(returnValues, r.keys[i]) } } return returnValues } func (s *reversePurgeLongHashMap) iterator() *iteratorLongHashMap { return newIteratorLong(s.keys, s.values, s.states, s.numActive) } func (s *reversePurgeLongHashMap) hashProbe(key int64) int { arrayMask := len(s.keys) - 1 probe := int(hashFn(key)) & arrayMask for s.states[probe] > 0 && s.keys[probe] != key { probe = (probe + 1) & arrayMask } return probe } func (s *reversePurgeLongHashMap) String() string { var sb strings.Builder sb.WriteString("ReversePurgeLongHashMap:\n") sb.WriteString(fmt.Sprintf(" %12s:%11s%20s %s\n", "Index", "States", "Values", "Keys")) for i := 0; i < len(s.keys); i++ { if s.states[i] <= 0 { continue } sb.WriteString(fmt.Sprintf(" %12d:%11d%20d %d\n", i, s.states[i], s.values[i], s.keys[i])) } return sb.String() } func newIteratorLong(keys []int64, values []int64, states []int16, numActive int) *iteratorLongHashMap { stride := int(uint64(float64(len(keys))*internal.InverseGolden) | 1) return &iteratorLongHashMap{ keys_: keys, values_: values, states_: states, numActive_: numActive, stride_: stride, mask_: len(keys) - 1, i_: -stride, } } func (i *iteratorLongHashMap) next() bool { i.i_ = (i.i_ + i.stride_) & i.mask_ for i.count_ < i.numActive_ { if i.states_[i.i_] > 0 { i.count_++ return true } i.i_ = (i.i_ + i.stride_) & i.mask_ } return false } func (i *iteratorLongHashMap) getKey() int64 { return i.keys_[i.i_] } func (i *iteratorLongHashMap) getValue() int64 { return i.values_[i.i_] }