memstore/cuckoo_index.go (446 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"github.com/uber/aresdb/cgoutils"
"math"
"math/rand"
"unsafe"
memCom "github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/utils"
"sync"
"time"
)
const (
// size of the stash
stashSize = 4
// only when load factor is larger than resizeThreshold
// we do resize without rehashing first
resizeThreshold = 0.9
// growth percentage for each resize
resizeFactor float32 = 0.2
// reserve signature 0 to indicate the slot is empty
emptySignature = 0
minSignature = 1
// offsets for bucket
// the data layout in a bucket is the following manner
// RecordID[8]|signature[8]|eventTime[8](optional)|key[8]
offsetToSignature = memCom.BucketSize * memCom.RecordIDBytes
offsetToEventTime = offsetToSignature + memCom.BucketSize*1
offsetToKeyWithEventTime = offsetToEventTime + memCom.BucketSize*4
offsetToKeyWithoutEventTime = offsetToEventTime
)
type stashEntry struct {
isValid bool
eventTime uint32
key memCom.Key
value memCom.RecordID
}
type hashResult struct {
bucket unsafe.Pointer
signature uint8
}
// CuckooIndex is a implementation of Hash Index using Cuckoo Hashing algorithm
// Lazy expiration is used to invalidate expired items
// CuckooIndex is not threadsafe
type CuckooIndex struct {
// number of bytes of a key
keyBytes int
// the size in bytes each bucket takes
bucketBytes int
// number of buckets
numBuckets int
// number of entries in bucket
numBucketEntries uint
// number of entries in stash
numStashEntries uint
// maxTrials when do evict and add
maxTrials int
// mark whether it is a fact vs dimension table hash index
hasEventTime bool
// bucket array
// the array is a byte array allocated in c
// the data layout in a bucket is the following manner
// RecordID[8]|signature[8]|eventTime[8](optional)|key[8]
buckets unsafe.Pointer
// stash is a special bucket, the only difference is its size
// use of stash is to reduce the probability of rehashing
// Note stash memory is allocated/de-allocated together with buckets
// stash also have 8 slots, only 4 (configurable) of them are used
stash unsafe.Pointer
// extra stash entry in go struct
// act as a temporary place before resize
staging *stagingEntry
// seeds holds the hash function seeds
// use different seeds to generate different hash values
seeds [memCom.NumHashes]uint32
// eventTimeCutoff record the smallest timestamp that was
eventTimeCutoff uint32
rand *rand.Rand
// report change of unmanaged memory.
hostMemoryManager memCom.HostMemoryManager
// mutex protects internal buffer for GPU transfer
transferLock sync.RWMutex
}
type stagingEntry struct {
eventTime uint32
key memCom.Key
value memCom.RecordID
}
func getDefaultInitNumBuckets() int {
if utils.IsTest() {
return 1000
}
return 1000000
}
// Size returns the current number of items stored in the hash table
// including expired items yet not known to the system
func (c *CuckooIndex) Size() uint {
return c.numBucketEntries + c.numStashEntries
}
// Update updates a key with a new recordID. Return whether key exists in the primary key or not.
func (c *CuckooIndex) Update(key memCom.Key, value memCom.RecordID) bool {
c.transferLock.Lock()
defer c.transferLock.Unlock()
keyPtr := unsafe.Pointer(&key[0])
for hashIndex := 0; hashIndex < memCom.NumHashes; hashIndex++ {
hashResult := c.hash(keyPtr, hashIndex)
for i := 0; i < memCom.BucketSize; i++ {
existingSignature := *c.getSignature(hashResult.bucket, i)
if !c.recordExpired(hashResult.bucket, i) &&
existingSignature == hashResult.signature &&
utils.MemEqual(c.getKey(hashResult.bucket, i), keyPtr, c.keyBytes) {
*c.getRecordID(hashResult.bucket, i) = value
return true
}
}
}
for i := 0; i < stashSize; i++ {
if !c.isEmpty(c.stash, i) &&
!c.recordExpired(c.stash, i) &&
utils.MemEqual(c.getKey(c.stash, i), keyPtr, c.keyBytes) {
*c.getRecordID(c.stash, i) = value
return true
}
}
return false
}
// Find looks up a record given key
func (c *CuckooIndex) Find(key memCom.Key) (memCom.RecordID, bool) {
keyPtr := unsafe.Pointer(&key[0])
for hashIndex := 0; hashIndex < memCom.NumHashes; hashIndex++ {
hashResult := c.hash(keyPtr, hashIndex)
for i := 0; i < memCom.BucketSize; i++ {
existingSignature := *c.getSignature(hashResult.bucket, i)
if !c.recordExpired(hashResult.bucket, i) &&
existingSignature == hashResult.signature &&
utils.MemEqual(c.getKey(hashResult.bucket, i), keyPtr, c.keyBytes) {
return *c.getRecordID(hashResult.bucket, i), true
}
}
}
for i := 0; i < stashSize; i++ {
if !c.isEmpty(c.stash, i) &&
!c.recordExpired(c.stash, i) &&
utils.MemEqual(c.getKey(c.stash, i), keyPtr, c.keyBytes) {
return *c.getRecordID(c.stash, i), true
}
}
return memCom.RecordID{}, false
}
// Capacity returns how many items current primary key can hold.
func (c *CuckooIndex) Capacity() uint {
return uint(c.numBuckets * memCom.BucketSize)
}
// AllocatedBytes returns the allocated size of primary key in bytes.
func (c *CuckooIndex) AllocatedBytes() uint {
return c.allocatedBytes()
}
// FindOrInsert find the existing key or insert a new (key, value) pair
func (c *CuckooIndex) FindOrInsert(key memCom.Key, value memCom.RecordID, eventTime uint32) (existingFound bool, recordID memCom.RecordID, err error) {
c.transferLock.Lock()
defer c.transferLock.Unlock()
if c.eventTimeExpired(eventTime) {
return false, memCom.RecordID{}, utils.StackError(nil, "Stale Value, eventTimeCutOff: %d, getEventTime Inserted: %d", c.eventTimeCutoff, eventTime)
}
existingFound, added, recordID, hashResults := c.findOrAddNew(unsafe.Pointer(&key[0]), value, eventTime)
if existingFound || added {
return existingFound, recordID, nil
}
if c.cuckooAdd(unsafe.Pointer(&key[0]), value, eventTime, hashResults) {
return false, value, nil
}
for i := 1; ; i++ {
if c.resize(float32(i) * resizeFactor) {
return false, value, nil
}
}
}
// Delete will delete a item with given key
func (c *CuckooIndex) Delete(key memCom.Key) {
c.transferLock.Lock()
defer c.transferLock.Unlock()
var hashResults [memCom.NumHashes]hashResult
for hashIndex := 0; hashIndex < memCom.NumHashes; hashIndex++ {
hashResult := c.hash(unsafe.Pointer(&key[0]), hashIndex)
hashResults[hashIndex] = hashResult
for i := 0; i < memCom.BucketSize; i++ {
if *c.getSignature(hashResult.bucket, i) == hashResult.signature &&
utils.MemEqual(c.getKey(hashResult.bucket, i), unsafe.Pointer(&key[0]), c.keyBytes) {
c.numBucketEntries--
*c.getSignature(hashResult.bucket, i) = emptySignature
return
}
}
}
for i := 0; i < stashSize; i++ {
if !c.isEmpty(c.stash, i) &&
utils.MemEqual(c.getKey(c.stash, i), unsafe.Pointer(&key[0]), c.keyBytes) {
*c.getSignature(c.stash, i) = emptySignature
c.numStashEntries--
return
}
}
}
// UpdateEventTimeCutoff updates eventTimeCutoff
func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32) {
c.eventTimeCutoff = cutoff
}
// GetEventTimeCutoff returns the cutoff event time.
func (c *CuckooIndex) GetEventTimeCutoff() uint32 {
return c.eventTimeCutoff
}
// LockForTransfer locks primary key for transfer and returns PrimaryKeyData
func (c *CuckooIndex) LockForTransfer() memCom.PrimaryKeyData {
c.transferLock.RLock()
return memCom.PrimaryKeyData{
Data: c.buckets,
// numBuckets plus stash bucket
NumBytes: c.bucketBytes * (c.numBuckets + 1),
Seeds: c.seeds,
KeyBytes: c.keyBytes,
NumBuckets: c.numBuckets,
}
}
// UnlockAfterTransfer release transfer lock
func (c *CuckooIndex) UnlockAfterTransfer() {
c.transferLock.RUnlock()
}
func (c *CuckooIndex) hash(key unsafe.Pointer, index int) hashResult {
hashValue := utils.Murmur3Sum32(key, c.keyBytes, c.seeds[index])
bucketIndex := hashValue % uint32(c.numBuckets)
bucket := utils.MemAccess(c.buckets, int(bucketIndex)*c.bucketBytes)
signature := c.extractSignatureByte(hashValue)
return hashResult{
bucket: bucket,
signature: signature,
}
}
func (c *CuckooIndex) generateRandomSeeds() {
for i := range c.seeds {
c.seeds[i] = c.rand.Uint32()
}
}
func (c *CuckooIndex) loadFactor() float64 {
return float64(c.numBucketEntries+c.numStashEntries) / float64(c.numBuckets*memCom.BucketSize)
}
// extractSignatureByte get the most significant byte from the hash value
// this is to avoid comparison of byte array as much as possible, which is expensive
func (c *CuckooIndex) extractSignatureByte(hashValue uint32) uint8 {
signature := uint8(hashValue >> (32 - 8))
if signature < minSignature {
signature = minSignature
}
return signature
}
func (c *CuckooIndex) getSignature(bucket unsafe.Pointer, index int) *uint8 {
return (*uint8)(utils.MemAccess(bucket, offsetToSignature+index))
}
func (c *CuckooIndex) getRecordID(bucket unsafe.Pointer, index int) *memCom.RecordID {
return (*memCom.RecordID)(utils.MemAccess(bucket, index*memCom.RecordIDBytes))
}
// call should be aware there is no eventime present, this method will return incorrect
func (c *CuckooIndex) getEventTime(bucket unsafe.Pointer, index int) *uint32 {
return (*uint32)(utils.MemAccess(bucket, offsetToEventTime+index*4))
}
func (c *CuckooIndex) getKey(bucket unsafe.Pointer, index int) unsafe.Pointer {
if !c.hasEventTime {
return utils.MemAccess(bucket, offsetToKeyWithoutEventTime+index*c.keyBytes)
}
return utils.MemAccess(bucket, offsetToKeyWithEventTime+index*c.keyBytes)
}
func (c *CuckooIndex) isEmpty(bucket unsafe.Pointer, index int) bool {
return *c.getSignature(bucket, index) == emptySignature
}
func (c *CuckooIndex) recordExpired(bucket unsafe.Pointer, index int) bool {
return c.hasEventTime && c.eventTimeExpired(*c.getEventTime(bucket, index))
}
func (c *CuckooIndex) eventTimeExpired(eventTime uint32) bool {
return c.eventTimeCutoff > eventTime
}
// randomSwap randomly pick a bucket position and swap with the value
func (c *CuckooIndex) randomSwap(key unsafe.Pointer, recordID *memCom.RecordID, eventTime *uint32, hashResults [memCom.NumHashes]hashResult) {
hashResult := hashResults[c.rand.Intn(memCom.NumHashes)]
slotIndex := c.rand.Intn(memCom.BucketSize)
*c.getRecordID(hashResult.bucket, slotIndex), *recordID = *recordID, *c.getRecordID(hashResult.bucket, slotIndex)
*c.getSignature(hashResult.bucket, slotIndex) = hashResult.signature
utils.MemSwap(c.getKey(hashResult.bucket, slotIndex), key, c.keyBytes)
if c.hasEventTime {
*c.getEventTime(hashResult.bucket, slotIndex), *eventTime = *eventTime, *c.getEventTime(hashResult.bucket, slotIndex)
}
}
// addNew only attempts to add new item into buckets, but not stash
// and assume there is no existing item
// and will not do the cuckoo process when the process fail
func (c *CuckooIndex) addNew(key unsafe.Pointer, recordID memCom.RecordID, eventTime uint32) (added bool, hashResults [memCom.NumHashes]hashResult) {
for hashIndex := 0; hashIndex < memCom.NumHashes; hashIndex++ {
hashResult := c.hash(key, hashIndex)
hashResults[hashIndex] = hashResult
for i := 0; i < memCom.BucketSize; i++ {
if c.isEmpty(hashResult.bucket, i) {
c.insertBucket(key, recordID, hashResult.signature, eventTime, hashResult.bucket, i)
c.numBucketEntries++
added = true
return
} else if c.recordExpired(hashResult.bucket, i) {
c.insertBucket(key, recordID, hashResult.signature, eventTime, hashResult.bucket, i)
added = true
return
}
}
}
return
}
// find existing or add new item to available slot
func (c *CuckooIndex) findOrAddNew(key unsafe.Pointer, value memCom.RecordID, eventTime uint32) (existingFound bool, added bool, recordID memCom.RecordID, hashResults [memCom.NumHashes]hashResult) {
indexToInsert := -1
isInsert := false
var bucketToInsert unsafe.Pointer
var signatureToInsert uint8
// look for existing record in buckets with all hash functions
// mark potential slot to insert new record
for hashIndex := 0; hashIndex < memCom.NumHashes; hashIndex++ {
hashResult := c.hash(key, hashIndex)
hashResults[hashIndex] = hashResult
for i := 0; i < memCom.BucketSize; i++ {
isEmpty := c.isEmpty(hashResult.bucket, i)
if isEmpty || c.recordExpired(hashResult.bucket, i) {
if indexToInsert < 0 {
indexToInsert = i
bucketToInsert = hashResult.bucket
signatureToInsert = hashResult.signature
isInsert = isEmpty
}
} else if *c.getSignature(hashResult.bucket, i) == hashResult.signature && utils.MemEqual(c.getKey(hashResult.bucket, i), key, c.keyBytes) {
existingFound = true
recordID = *c.getRecordID(hashResult.bucket, i)
return
}
}
}
// look for existing record in stash
for i := 0; i < stashSize; i++ {
if !c.recordExpired(c.stash, i) && !c.isEmpty(c.stash, i) &&
utils.MemEqual(c.getKey(c.stash, i), key, c.keyBytes) {
existingFound = true
recordID = *c.getRecordID(c.stash, i)
return
}
}
if indexToInsert >= 0 {
c.insertBucket(key, value, signatureToInsert, eventTime, bucketToInsert, indexToInsert)
if isInsert {
c.numBucketEntries++
}
added = true
recordID = value
}
return
}
// randomly evict existing item with conflict hash and reinsert
func (c *CuckooIndex) cuckooAdd(key unsafe.Pointer, recordID memCom.RecordID, eventTime uint32, hashResults [memCom.NumHashes]hashResult) bool {
insertKey := make(memCom.Key, c.keyBytes)
utils.MemCopy(unsafe.Pointer(&insertKey[0]), key, c.keyBytes)
for trial := 0; trial < c.maxTrials; trial++ {
c.randomSwap(unsafe.Pointer(&insertKey[0]), &recordID, &eventTime, hashResults)
added, evictHashResults := c.addNew(unsafe.Pointer(&insertKey[0]), recordID, eventTime)
if added {
return true
}
hashResults = evictHashResults
}
// insert to stash
for i := 0; i < stashSize; i++ {
if c.isEmpty(c.stash, i) {
c.numStashEntries++
c.insertBucket(unsafe.Pointer(&insertKey[0]), recordID, minSignature, eventTime, c.stash, i)
return true
} else if c.recordExpired(c.stash, i) {
c.insertBucket(unsafe.Pointer(&insertKey[0]), recordID, minSignature, eventTime, c.stash, i)
return true
}
}
// save swapped out record to staging area for resizing
if c.staging == nil {
staging := stagingEntry{}
if c.hasEventTime {
staging.eventTime = eventTime
}
staging.value = recordID
staging.key = make(memCom.Key, c.keyBytes)
copy(staging.key, insertKey)
c.staging = &staging
}
return false
}
// insert will insert without find existing item
func (c *CuckooIndex) insert(key unsafe.Pointer, v memCom.RecordID, eventTime uint32) bool {
added, hashResults := c.addNew(key, v, eventTime)
if !added {
return c.cuckooAdd(key, v, eventTime, hashResults)
}
return true
}
func (c *CuckooIndex) insertBucket(key unsafe.Pointer, recordID memCom.RecordID, signature uint8, eventTime uint32, bucket unsafe.Pointer, index int) {
utils.MemCopy(c.getKey(bucket, index), key, c.keyBytes)
*c.getSignature(bucket, index) = signature
*c.getRecordID(bucket, index) = recordID
if c.hasEventTime {
*c.getEventTime(bucket, index) = eventTime
}
}
func (c *CuckooIndex) getMaxTrials() int {
return int(1+math.Log2(float64(c.numBuckets))) * 2
}
// resize the hast table by growFactor
func (c *CuckooIndex) resize(resizeFactor float32) (ok bool) {
newIndex := newCuckooIndex(
c.keyBytes,
c.hasEventTime,
int(float32(c.numBuckets)*float32(1+resizeFactor)),
c.hostMemoryManager,
)
// insert existing keys to new index
for i := 0; i < c.numBuckets+1; i++ {
var bucket unsafe.Pointer
numElements := memCom.BucketSize
if i == c.numBuckets {
bucket = c.stash
numElements = stashSize
} else {
bucket = utils.MemAccess(c.buckets, i*c.bucketBytes)
}
for j := 0; j < numElements; j++ {
if c.isEmpty(bucket, j) || c.recordExpired(bucket, j) {
continue
}
eventTime := *c.getEventTime(bucket, j)
key := c.getKey(bucket, j)
recordID := *c.getRecordID(bucket, j)
if newIndex.insert(key, recordID, eventTime) {
continue
}
newIndex.Destruct()
return false
}
}
if c.staging != nil {
if !newIndex.insert(unsafe.Pointer(&c.staging.key[0]), c.staging.value, c.staging.eventTime) {
newIndex.Destruct()
return false
}
newIndex.staging = nil
}
// copy to current index. note transferLock is reused.
cgoutils.HostFree(c.buckets)
newIndex.hostMemoryManager.ReportUnmanagedSpaceUsageChange(int64(-c.allocatedBytes()))
c.numBuckets = newIndex.numBuckets
c.rand = newIndex.rand
c.seeds = newIndex.seeds
c.buckets = newIndex.buckets
c.staging = newIndex.staging
c.stash = newIndex.stash
c.numBucketEntries = newIndex.numBucketEntries
c.numStashEntries = newIndex.numStashEntries
c.maxTrials = newIndex.maxTrials
newIndex = nil
return true
}
func (c *CuckooIndex) allocatedBytes() uint {
return uint(c.numBuckets * c.bucketBytes)
}
func (c *CuckooIndex) allocate() {
totalBucketBytes := int(c.allocatedBytes())
// allocate buckets plus stash
c.buckets = cgoutils.HostAlloc(totalBucketBytes + c.bucketBytes)
c.stash = utils.MemAccess(c.buckets, totalBucketBytes)
}
// newCuckooIndex create a cuckoo hashing index
func newCuckooIndex(keyBytes int, hasEventTime bool, initNumBuckets int,
hostMemoryManager memCom.HostMemoryManager) *CuckooIndex {
if initNumBuckets <= 0 {
initNumBuckets = getDefaultInitNumBuckets()
}
cuckooIndex := &CuckooIndex{
rand: rand.New(rand.NewSource(time.Now().Unix())),
keyBytes: keyBytes,
hasEventTime: hasEventTime,
numBuckets: initNumBuckets,
hostMemoryManager: hostMemoryManager,
transferLock: sync.RWMutex{},
}
// recordIDBytes + keyBytes + signature (1 byte)
var cellBytes = memCom.RecordIDBytes + cuckooIndex.keyBytes + 1
// plus eventTime (4 bytes)
if cuckooIndex.hasEventTime {
cellBytes += 4
}
cuckooIndex.bucketBytes = memCom.BucketSize * cellBytes
hostMemoryManager.ReportUnmanagedSpaceUsageChange(int64(cuckooIndex.allocatedBytes()))
cuckooIndex.allocate()
cuckooIndex.maxTrials = cuckooIndex.getMaxTrials()
cuckooIndex.generateRandomSeeds()
return cuckooIndex
}
// Destruct frees all allocated memory
func (c *CuckooIndex) Destruct() {
c.transferLock.Lock()
defer c.transferLock.Unlock()
bytes := c.allocatedBytes()
cgoutils.HostFree(c.buckets)
c.buckets = nil
c.hostMemoryManager.ReportUnmanagedSpaceUsageChange(-int64(bytes))
}
// NewPrimaryKey create a primary key data structure
// params:
// 1. keyBytes, number of bytes of key
// 2. hasEventTime determine whether primary key should record event time for expiration
// 3. initNumBuckets determines the starting number of buckets, setting to 0 to use default
func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int,
hostMemoryManager memCom.HostMemoryManager) memCom.PrimaryKey {
return newCuckooIndex(keyBytes, hasEventTime, initNumBuckets, hostMemoryManager)
}