common/collection/concurrent_tx_map.go (171 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package collection import ( "sync" "sync/atomic" ) const ( // nShards represents the number of shards // At any given point of time, there can only // be nShards number of concurrent writers to // the map at max nShards = 32 ) type ( // ShardedConcurrentTxMap is an implementation of // ConcurrentMap that internally uses multiple // sharded maps to increase parallelism ShardedConcurrentTxMap struct { shards [nShards]mapShard hashfn HashFunc size int32 initialCap int } // mapIteratorImpl represents an iterator type // for the concurrent map. mapIteratorImpl struct { stopCh chan struct{} dataCh chan *MapEntry } // mapShard represents a single instance // of thread safe map mapShard struct { sync.RWMutex items map[interface{}]interface{} } ) // NewShardedConcurrentTxMap returns an instance of ShardedConcurrentMap // // ShardedConcurrentMap is a thread safe map that maintains upto nShards // number of maps internally to allow nShards writers to be acive at the // same time. This map *does not* use re-entrant locks, so access to the // map during iterator can cause a dead lock. // // @param initialSz // // The initial size for the map // // @param hashfn // // The hash function to use for sharding func NewShardedConcurrentTxMap(initialCap int, hashfn HashFunc) ConcurrentTxMap { cmap := new(ShardedConcurrentTxMap) cmap.hashfn = hashfn cmap.initialCap = MaxInt(nShards, initialCap/nShards) return cmap } // Get returns the value corresponding to the key, if it exist func (cmap *ShardedConcurrentTxMap) Get(key interface{}) (interface{}, bool) { shard := cmap.getShard(key) var ok bool var value interface{} shard.RLock() if shard.items != nil { value, ok = shard.items[key] } shard.RUnlock() return value, ok } // Contains returns true if the key exist and false otherwise func (cmap *ShardedConcurrentTxMap) Contains(key interface{}) bool { _, ok := cmap.Get(key) return ok } // Put records the given key value mapping. Overwrites previous values func (cmap *ShardedConcurrentTxMap) Put(key interface{}, value interface{}) { shard := cmap.getShard(key) shard.Lock() cmap.lazyInitShard(shard) _, ok := shard.items[key] if !ok { atomic.AddInt32(&cmap.size, 1) } shard.items[key] = value shard.Unlock() } // PutIfNotExist records the mapping, if there is no mapping for this key already // Returns true if the mapping was recorded, false otherwise func (cmap *ShardedConcurrentTxMap) PutIfNotExist(key interface{}, value interface{}) bool { shard := cmap.getShard(key) var ok bool shard.Lock() cmap.lazyInitShard(shard) _, ok = shard.items[key] if !ok { shard.items[key] = value atomic.AddInt32(&cmap.size, 1) } shard.Unlock() return !ok } // Remove deletes the given key from the map func (cmap *ShardedConcurrentTxMap) Remove(key interface{}) { shard := cmap.getShard(key) shard.Lock() cmap.lazyInitShard(shard) _, ok := shard.items[key] if ok { delete(shard.items, key) atomic.AddInt32(&cmap.size, -1) } shard.Unlock() } // GetAndDo returns the value corresponding to the key, and apply fn to key value before return value // return (value, value exist or not, error when evaluation fn) func (cmap *ShardedConcurrentTxMap) GetAndDo(key interface{}, fn ActionFunc) (interface{}, bool, error) { shard := cmap.getShard(key) var value interface{} var ok bool var err error shard.Lock() if shard.items != nil { value, ok = shard.items[key] if ok { err = fn(key, value) } } shard.Unlock() return value, ok, err } // PutOrDo put the key value in the map, if key does not exists, otherwise, call fn with existing key and value // return (value, fn evaluated or not, error when evaluation fn) func (cmap *ShardedConcurrentTxMap) PutOrDo(key interface{}, value interface{}, fn ActionFunc) (interface{}, bool, error) { shard := cmap.getShard(key) var err error shard.Lock() cmap.lazyInitShard(shard) v, ok := shard.items[key] if !ok { shard.items[key] = value v = value atomic.AddInt32(&cmap.size, 1) } else { err = fn(key, v) } shard.Unlock() return v, ok, err } // RemoveIf deletes the given key from the map if fn return true func (cmap *ShardedConcurrentTxMap) RemoveIf(key interface{}, fn PredicateFunc) bool { shard := cmap.getShard(key) var removed bool shard.Lock() if shard.items != nil { value, ok := shard.items[key] if ok && fn(key, value) { removed = true delete(shard.items, key) atomic.AddInt32(&cmap.size, -1) } } shard.Unlock() return removed } func newMapIterator() *mapIteratorImpl { return &mapIteratorImpl{ dataCh: make(chan *MapEntry, 8), stopCh: make(chan struct{}), } } // Close closes the iterator func (it *mapIteratorImpl) Close() { close(it.stopCh) } // Entries returns a channel of map entries func (it *mapIteratorImpl) Entries() <-chan *MapEntry { return it.dataCh } // Iter returns an iterator to the map. This map // does not use re-entrant locks, so access or modification // to the map during iteration can cause a dead lock. func (cmap *ShardedConcurrentTxMap) Iter() MapIterator { iterator := newMapIterator() go func(iterator *mapIteratorImpl) { for i := 0; i < nShards; i++ { cmap.shards[i].RLock() for k, v := range cmap.shards[i].items { entry := &MapEntry{Key: k, Value: v} select { case iterator.dataCh <- entry: case <-iterator.stopCh: cmap.shards[i].RUnlock() close(iterator.dataCh) return } } cmap.shards[i].RUnlock() } close(iterator.dataCh) }(iterator) return iterator } // Len returns the number of items in the map func (cmap *ShardedConcurrentTxMap) Len() int { return int(atomic.LoadInt32(&cmap.size)) } func (cmap *ShardedConcurrentTxMap) getShard(key interface{}) *mapShard { shardIdx := cmap.hashfn(key) % nShards return &cmap.shards[shardIdx] } func (cmap *ShardedConcurrentTxMap) lazyInitShard(shard *mapShard) { if shard.items == nil { shard.items = make(map[interface{}]interface{}, cmap.initialCap) } }