hashring/hashring.go (268 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// Package hashring provides a hashring implementation that uses a red-black
// Tree.
package hashring
import (
"fmt"
"strings"
"sync"
"github.com/uber/ringpop-go/events"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/membership"
"github.com/uber-common/bark"
)
// Configuration is a configuration struct that can be passed to the
// Ringpop constructor to customize hash ring options.
type Configuration struct {
// ReplicaPoints is the number of positions a node will be assigned on the
// ring. A bigger number will provide better key distribution, but require
// more computation when building or traversing the ring (typically on
// lookups or membership changes).
ReplicaPoints int
}
// replicaPoint contains the address where a specific point in the hashring maps to
type replicaPoint struct {
// hash of the point in the hashring
hash int
// identity of the member that owns this replicaPoint.
identity string
// address of the member that owns this replicaPoint.
address string
// index of the replica point for a member
index int
}
func (r replicaPoint) Compare(other interface{}) (result int) {
o := other.(replicaPoint)
result = r.hash - o.hash
if result != 0 {
return
}
result = strings.Compare(r.address, o.address)
if result != 0 {
return
}
result = r.index - o.index
return
}
// HashRing stores strings on a consistent hash ring. HashRing internally uses
// a Red-Black Tree to achieve O(log N) lookup and insertion time.
type HashRing struct {
sync.RWMutex
events.SyncEventEmitter
hashfunc func(string) int
replicaPoints int
serverSet map[string]struct{}
tree *redBlackTree
// legacyChecksum is the old checksum that is calculated only from the
// identities being added.
legacyChecksum uint32
// checksummers is map of named Checksum calculators for the hashring
checksummers map[string]Checksummer
// checksums is a map containing the checksums that are representing this
// hashring. The map should never be altered in place so it is safe to pass
// a copy to components that need the checksums
checksums map[string]uint32
logger bark.Logger
}
// New instantiates and returns a new HashRing.
func New(hashfunc func([]byte) uint32, replicaPoints int) *HashRing {
r := &HashRing{
replicaPoints: replicaPoints,
hashfunc: func(str string) int {
return int(hashfunc([]byte(str)))
},
logger: logging.Logger("ring"),
checksummers: map[string]Checksummer{
"replica": &replicaPointChecksummer{},
},
}
r.serverSet = make(map[string]struct{})
r.tree = &redBlackTree{}
return r
}
// Checksum returns the checksum of all stored servers in the HashRing
// Use this value to find out if the HashRing is mutated.
func (r *HashRing) Checksum() (checksum uint32) {
r.RLock()
checksum = r.legacyChecksum
r.RUnlock()
return
}
// Checksums returns a map of checksums named by the algorithm used to compute
// the checksum.
func (r *HashRing) Checksums() (checksums map[string]uint32) {
r.RLock()
// even though the map is immutable the pointer to it is not so it requires
// a readlock
checksums = r.checksums
r.RUnlock()
return
}
// computeChecksumsNoLock re-computes all configured checksums for this hashring
// and updates the in memory map with a new map containing the new checksums.
func (r *HashRing) computeChecksumsNoLock() {
oldChecksums := r.checksums
r.checksums = make(map[string]uint32)
changed := false
// calculate all configured checksums
for name, checksummer := range r.checksummers {
oldChecksum := oldChecksums[name]
newChecksum := checksummer.Checksum(r)
r.checksums[name] = newChecksum
if oldChecksum != newChecksum {
changed = true
}
}
// calculate the legacy identity only based checksum
legacyChecksummer := identityChecksummer{}
oldChecksum := r.legacyChecksum
newChecksum := legacyChecksummer.Checksum(r)
r.legacyChecksum = newChecksum
if oldChecksum != newChecksum {
changed = true
}
if changed {
r.logger.WithFields(bark.Fields{
"checksum": r.legacyChecksum,
"oldChecksum": oldChecksum,
"checksums": r.checksums,
}).Debug("ringpop ring computed new checksum")
}
r.EmitEvent(events.RingChecksumEvent{
OldChecksum: oldChecksum,
NewChecksum: r.legacyChecksum,
OldChecksums: oldChecksums,
NewChecksums: r.checksums,
})
}
func (r *HashRing) replicaPointForServer(server membership.Member, replica int) replicaPoint {
identity := server.Identity()
var replicaStr string
if identity == server.GetAddress() {
// If identity and address are the same, we need to be backwards compatible
// this older replicaStr format will cause replica point collisions when there are
// multiple instances running on the same host (e.g. on port 2100 and 21001).
replicaStr = fmt.Sprintf("%s%v", identity, replica)
} else {
// This is the "new and improved" version.
// Due to backwards compatibility it's only used when we got an identity.
replicaStr = fmt.Sprintf("%s#%v", identity, replica)
}
return replicaPoint{
hash: r.hashfunc(replicaStr),
identity: identity,
address: server.GetAddress(),
index: replica,
}
}
// AddMembers adds multiple membership Member's and thus their replicas to the HashRing.
func (r *HashRing) AddMembers(members ...membership.Member) bool {
r.Lock()
changed := false
var added []string
for _, member := range members {
if r.addMemberNoLock(member) {
added = append(added, member.GetAddress())
changed = true
}
}
if changed {
r.computeChecksumsNoLock()
r.EmitEvent(events.RingChangedEvent{
ServersAdded: added,
})
}
r.Unlock()
return changed
}
// This function isn't thread-safe, only call it when the HashRing is locked.
func (r *HashRing) addMemberNoLock(member membership.Member) bool {
if _, ok := r.serverSet[member.GetAddress()]; ok {
return false
}
r.serverSet[member.GetAddress()] = struct{}{}
// add all replica points for the member
for i := 0; i < r.replicaPoints; i++ {
r.tree.Insert(
r.replicaPointForServer(member, i),
member.GetAddress(),
)
}
return true
}
// RemoveMembers removes multiple membership Member's and thus their replicas from the HashRing.
func (r *HashRing) RemoveMembers(members ...membership.Member) bool {
r.Lock()
changed := false
var removed []string
for _, server := range members {
if r.removeMemberNoLock(server) {
removed = append(removed, server.GetAddress())
changed = true
}
}
if changed {
r.computeChecksumsNoLock()
r.EmitEvent(events.RingChangedEvent{
ServersRemoved: removed,
})
}
r.Unlock()
return changed
}
// This function isn't thread-safe, only call it when the HashRing is locked.
func (r *HashRing) removeMemberNoLock(member membership.Member) bool {
if _, ok := r.serverSet[member.GetAddress()]; !ok {
return false
}
delete(r.serverSet, member.GetAddress())
for i := 0; i < r.replicaPoints; i++ {
r.tree.Delete(
r.replicaPointForServer(member, i),
)
}
return true
}
// ProcessMembershipChanges takes a slice of membership.MemberChange's and
// applies them to the hashring by adding and removing members accordingly to
// the changes passed in.
func (r *HashRing) ProcessMembershipChanges(changes []membership.MemberChange) {
r.Lock()
changed := false
var added, updated, removed []string
for _, change := range changes {
if change.Before == nil && change.After != nil {
// new member
if r.addMemberNoLock(change.After) {
added = append(added, change.After.GetAddress())
changed = true
}
} else if change.Before != nil && change.After == nil {
// remove member
if r.removeMemberNoLock(change.Before) {
removed = append(removed, change.Before.GetAddress())
changed = true
}
} else {
if change.Before.Identity() != change.After.Identity() {
// identity has changed, member needs to be removed and readded
r.removeMemberNoLock(change.Before)
r.addMemberNoLock(change.After)
updated = append(updated, change.After.GetAddress())
changed = true
}
}
}
// recompute checksums on changes
if changed {
r.computeChecksumsNoLock()
r.EmitEvent(events.RingChangedEvent{
ServersAdded: added,
ServersUpdated: updated,
ServersRemoved: removed,
})
}
r.Unlock()
}
// HasServer returns whether the HashRing contains the given server.
func (r *HashRing) HasServer(server string) bool {
r.RLock()
_, ok := r.serverSet[server]
r.RUnlock()
return ok
}
// Servers returns all servers contained in the HashRing.
func (r *HashRing) Servers() []string {
r.RLock()
servers := r.copyServersNoLock()
r.RUnlock()
return servers
}
// This function isn't thread-safe, only call it when the HashRing is locked.
func (r *HashRing) copyServersNoLock() []string {
servers := make([]string, 0, len(r.serverSet))
for server := range r.serverSet {
servers = append(servers, server)
}
return servers
}
// ServerCount returns the number of servers contained in the HashRing.
func (r *HashRing) ServerCount() int {
r.RLock()
count := len(r.serverSet)
r.RUnlock()
return count
}
// Lookup returns the owner of the given key and whether the HashRing contains
// the key at all.
func (r *HashRing) Lookup(key string) (string, bool) {
strs := r.LookupN(key, 1)
if len(strs) == 0 {
return "", false
}
return strs[0], true
}
// LookupN returns the N servers that own the given key. Duplicates in the form
// of virtual nodes are skipped to maintain a list of unique servers. If there
// are less servers than N, we simply return all existing servers.
func (r *HashRing) LookupN(key string, n int) []string {
r.RLock()
servers := r.lookupNNoLock(key, n)
r.RUnlock()
return servers
}
// This function isn't thread-safe, only call it when the HashRing is locked.
func (r *HashRing) lookupNNoLock(key string, n int) []string {
hash := r.hashfunc(key)
unique := make(map[valuetype]struct{})
orderedUnique := make([]valuetype, 0, n)
// lookup N unique servers from the red-black tree. If we have not
// collected all the servers we want, we have reached the
// end of the red-black tree and we need to loop around and inspect the
// tree starting at 0.
r.tree.LookupOrderedNUniqueAt(n, replicaPoint{hash: hash}, unique, &orderedUnique)
if len(unique) < n {
r.tree.LookupOrderedNUniqueAt(n, replicaPoint{hash: 0}, unique, &orderedUnique)
}
var servers []string
for _, server := range orderedUnique {
servers = append(servers, server.(string))
}
return servers
}