qf.go (394 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
// Package qf implements a quotient filter data
// structure which supports:
// 1. external storage per entry
// 2. dynamic doubling
// 3. packed or unpacked representations (choose time or space)
// 4. a user overrideable hash function (default is murmur)
package qf
import (
"fmt"
"math"
"unsafe"
)
// MaxLoadingFactor specifies the boundary at which we will double
// the quotient filter hash table and also is used to initially size
// the table.
const MaxLoadingFactor = 0.65
// Filter is a quotient filter representation
type Filter struct {
entries uint64
size uint64
filter Vector
storage Vector
rBits, qBits uint
rMask uint64
maxEntries uint64
config Config
hashfn HashFn
allocfn VectorAllocateFn
}
// Len returns the number of entries in the quotient filter
func (qf *Filter) Len() uint64 {
return qf.entries
}
// DebugDump prints a textual representation of the quotient filter
// to stdout
func (qf *Filter) DebugDump(full bool) {
fmt.Printf("\nquotient filter is %d large (%d q bits) with %d entries (loaded %0.3f)\n",
qf.size, qf.qBits, qf.entries, float64(qf.entries)/float64(qf.size))
if full {
fmt.Printf(" bucket O C S remainder->\n")
skipped := 0
for i := uint64(0); i < uint64(qf.size); i++ {
o, c, s := 0, 0, 0
sd := qf.read(i)
if sd.occupied() {
o = 1
}
if sd.continuation() {
c = 1
}
if sd.shifted() {
s = 1
}
if sd.empty() {
skipped++
} else {
if skipped > 0 {
fmt.Printf(" ...\n")
skipped = 0
}
r := sd.r()
v := uint64(0)
if qf.storage != nil {
v = qf.storage.Get(i)
}
fmt.Printf("%8d %d %d %d %x (%d)\n", i, o, c, s, r, v)
}
}
if skipped > 0 {
fmt.Printf(" ...\n")
}
}
}
// iterate the qf and call the callback once for each hash value present
func (qf *Filter) eachHashValue(cb func(uint64, uint64)) {
// a stack of q values
stack := []uint64{}
// let's start from an unshifted value
start := uint64(0)
for qf.read(start).shifted() {
right(&start, qf.size)
}
end := start
left(&end, qf.size)
for i := start; true; right(&i, qf.size) {
sd := qf.read(i)
if !sd.continuation() && len(stack) > 0 {
stack = stack[1:]
}
if sd.occupied() {
stack = append(stack, i)
}
if len(stack) > 0 {
r := sd.r()
cb((stack[0]<<qf.rBits)|(r&qf.rMask), i)
}
if i == end {
break
}
}
}
// New allocates a new quotient filter with default initial
// sizing and no external storage configured.
func New() *Filter {
return NewWithConfig(Config{})
}
// NewWithConfig allocates a new quotient filter based on the
// supplied configuration
func NewWithConfig(c Config) *Filter {
var qf Filter
if c.BitPacked {
qf.allocfn = BitPackedVectorAllocate
} else {
qf.allocfn = UnpackedVectorAllocate
}
if c.HashFn == nil {
c.HashFn = murmurhash64
}
qf.hashfn = c.HashFn
qbits := c.QBits()
qf.initForQuotientBits(uint(qbits))
qf.config = c
qf.allocStorage()
if qf.maxEntries > qf.size {
panic("internal inconsistency")
}
return &qf
}
// BitsOfStoragePerEntry reports the configured external storage for the
// quotient filter
func (qf *Filter) BitsOfStoragePerEntry() uint {
return qf.config.BitsOfStoragePerEntry
}
func (qf *Filter) allocStorage() {
qf.filter = qf.allocfn(3+bitsPerWord-qf.qBits, qf.size)
if qf.config.BitsOfStoragePerEntry > 0 {
qf.storage = qf.allocfn(qf.config.BitsOfStoragePerEntry, qf.size)
}
}
func (qf *Filter) initForQuotientBits(qBits uint) {
qf.qBits = qBits
qf.rBits, qf.rMask, qf.size = initForQuotientBits(qBits)
qf.rBits = (bitsPerWord - qBits)
qf.rMask = 0
for i := uint(0); i < qf.rBits; i++ {
qf.rMask |= 1 << i
}
qf.maxEntries = uint64(math.Ceil(float64(qf.size) * MaxLoadingFactor))
}
func initForQuotientBits(qBits uint) (rBits uint, rMask, size uint64) {
size = 1 << (uint64(qBits))
rBits = (bitsPerWord - qBits)
for i := uint(0); i < rBits; i++ {
rMask |= 1 << i
}
return
}
type slotData uint64
const (
occupiedMask = slotData(1)
continuationMask = slotData(1 << 1)
shiftedMask = slotData(1 << 2)
bookkeepingMask = slotData(0x7)
)
func (sd slotData) empty() bool {
return (sd & bookkeepingMask) == 0
}
func (sd slotData) occupied() bool {
return (sd & occupiedMask) != 0
}
func (sd *slotData) setOccupied(on bool) {
if on {
*sd |= occupiedMask
} else {
*sd &= ^occupiedMask
}
}
func (sd slotData) continuation() bool {
return (sd & continuationMask) != 0
}
func (sd *slotData) setContinuation(on bool) {
if on {
*sd |= continuationMask
} else {
*sd &= ^continuationMask
}
}
func (sd slotData) shifted() bool {
return (sd & shiftedMask) != 0
}
func (sd *slotData) setShifted(on bool) {
if on {
*sd |= shiftedMask
} else {
*sd &= ^shiftedMask
}
}
func (sd slotData) r() uint64 {
return uint64(sd >> 3)
}
func (sd *slotData) setR(r uint64) {
*sd = (*sd & bookkeepingMask) | slotData(r<<3)
}
func (qf *Filter) read(slot uint64) slotData {
return slotData(qf.filter.Get(slot))
}
func (qf *Filter) write(slot uint64, sd slotData) {
qf.filter.Set(slot, uint64(sd))
}
func (qf *Filter) swap(slot uint64, sd slotData) slotData {
return slotData(qf.filter.Swap(slot, uint64(sd)))
}
func (qf *Filter) countEntries() (count uint64) {
for i := uint64(0); i < qf.size; i++ {
if !qf.read(i).empty() {
count++
}
}
return
}
// InsertStringWithValue stores the string key and an associated
// integer value in the quotient filter it returns whether the
// key was already present in the quotient filter.
func (qf *Filter) InsertStringWithValue(s string, value uint64) bool {
return qf.InsertWithValue(*(*[]byte)(unsafe.Pointer(&s)), value)
}
// InsertString stores the string key in the quotient filter and
// returns whether this string was already present
func (qf *Filter) InsertString(s string) bool {
return qf.InsertStringWithValue(s, 0)
}
// InsertRawHash inserts a pre-calculated raw hash value with associated
// external data into the quotient filter. The hash calculation algorithm
// must be the very same used internally by the quotient filter, otherwise
// lookups will fail. This is a very low level insertion, use with care
func (qf *Filter) InsertRawHash(hv uint64, value uint64) (update bool) {
if qf.maxEntries <= qf.entries {
qf.double()
}
dq := hv >> qf.rBits
dr := hv & qf.rMask
return qf.insertByHash(dq, dr, value)
}
func (qf *Filter) double() {
// start with a shallow coppy
cpy := *qf
cpy.entries = 0
cpy.initForQuotientBits(cpy.qBits + 1)
cpy.allocStorage()
qf.eachHashValue(func(hv uint64, slot uint64) {
dq := hv >> cpy.rBits
dr := hv & cpy.rMask
var v uint64
if qf.storage != nil {
v = qf.storage.Get(slot)
}
cpy.insertByHash(dq, dr, v)
})
// shallow copy back over self
*qf = cpy
}
// InsertWithValue stores the key (byte slice) and an integer value in
// the quotient filter. It returns whether a value already existed.
func (qf *Filter) InsertWithValue(v []byte, value uint64) (update bool) {
if qf.maxEntries <= qf.entries {
qf.double()
}
dq, dr := hash(qf.hashfn, v, qf.rBits, qf.rMask)
return qf.insertByHash(uint64(dq), uint64(dr), value)
}
// Insert stores the key (byte slice) in the quotient filter it
// returns whether it already existed
func (qf *Filter) Insert(v []byte) (update bool) {
return qf.InsertWithValue(v, 0)
}
func (qf *Filter) insertByHash(dq, dr, value uint64) bool {
sd := qf.read(dq)
// case 1, the slot is empty
if sd.empty() {
qf.entries++
sd.setOccupied(true)
sd.setR(dr)
qf.write(uint64(dq), sd)
if qf.storage != nil {
qf.storage.Set(dq, value)
}
return false
}
// if the occupied bit is set for this dq, then we are
// extending an existing run
extendingRun := sd.occupied()
// mark occupied if we are not extending a run
if !extendingRun {
sd.setOccupied(true)
qf.write(dq, sd)
}
// ok, let's find the start
runStart := dq
if sd.shifted() {
runStart = findStart(dq, qf.size, qf.filter.Get)
}
// now let's find the spot within the run
slot := runStart
if extendingRun {
sd = qf.read(slot)
for {
if sd.empty() || sd.r() >= dr {
break
}
right(&slot, qf.size)
sd = qf.read(slot)
if !sd.continuation() {
break
}
}
}
// case 2, the value is already in the filter
if dr == sd.r() {
// update value
if qf.storage != nil {
qf.storage.Set(slot, value)
}
return true
}
qf.entries++
// case 3: we have to insert into an existing run
// we are writing remainder <dr> into <slot>
shifted := (slot != uint64(dq))
continuation := slot != runStart
for {
// dr -> the remainder to write here
if qf.storage != nil {
value = qf.storage.Swap(slot, value)
}
var new slotData
new.setShifted(shifted)
new.setContinuation(continuation)
old := qf.read(slot)
new.setOccupied(old.occupied())
new.setR(dr)
qf.write(slot, new)
if old.empty() {
break
}
if ((slot == runStart) && extendingRun) || old.continuation() {
continuation = true
} else {
continuation = false
}
dr = old.r()
right(&slot, qf.size)
shifted = true
}
return false
}
func right(i *uint64, size uint64) {
*i++
if *i >= size {
*i = 0
}
}
func left(i *uint64, size uint64) {
if *i == 0 {
*i += size
}
*i--
}
// XXX: error
func findStart(dq uint64, size uint64, read readFn) uint64 {
// scan left to figure out how much to skip
runs, complete := 1, 0
for i := dq; true; left(&i, size) {
sd := slotData(read(i))
if !sd.continuation() {
complete++
}
if !sd.shifted() {
break
} else if sd.occupied() {
runs++
}
}
// scan right to find our run
for runs > complete {
right(&dq, size)
if !slotData(read(dq)).continuation() {
complete++
}
}
return dq
}
// Contains returns whether the byte slice is contained
// within the quotient filter
func (qf *Filter) Contains(v []byte) bool {
found, _ := qf.Lookup(v)
return found
}
// ContainsString returns whether the string is contained
// within the quotient filter
func (qf *Filter) ContainsString(s string) bool {
found, _ := qf.Lookup(*(*[]byte)(unsafe.Pointer(&s)))
return found
}
// Lookup searches for key and returns whether it
// exists, and the value stored with it (if any)
func (qf *Filter) Lookup(key []byte) (bool, uint64) {
dq, dr := hash(qf.hashfn, key, qf.rBits, qf.rMask)
var storageFn readFn
if qf.storage != nil {
storageFn = qf.storage.Get
}
return lookupByHash(dq, dr, qf.size, qf.filter.Get, storageFn)
}
func lookupByHash(dq, dr, size uint64, read, storage readFn) (bool, uint64) {
sd := slotData(read(dq))
if !sd.occupied() {
return false, 0
}
slot := dq
if sd.shifted() {
slot = findStart(dq, size, read)
sd = slotData(read(slot))
}
for {
if sd.r() == dr {
value := uint64(0)
if storage != nil {
value = storage(slot)
}
return true, value
}
if sd.r() > dr {
break
}
right(&slot, size)
sd = slotData(read(slot))
if !sd.continuation() {
break
}
}
return false, 0
}
// LookupString searches for key and returns whether it
// exists, and the value stored with it (if any)
func (qf *Filter) LookupString(key string) (bool, uint64) {
return qf.Lookup(*(*[]byte)(unsafe.Pointer(&key)))
}
func hash(fn HashFn, v []byte, rBits uint, rMask uint64) (q, r uint64) {
hv := fn(v)
dq := hv >> rBits
dr := hv & rMask
return uint64(dq), uint64(dr)
}