lib/hashring/ring.go (149 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hashring
import (
"context"
"fmt"
"sync"
"time"
"github.com/cenkalti/backoff"
"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/healthcheck"
"github.com/uber/kraken/lib/hostlist"
"github.com/uber/kraken/lib/hrw"
"github.com/uber/kraken/utils/log"
"github.com/uber/kraken/utils/stringset"
)
const (
_defaultWeight = 100
_membershipWaitMetric = "membership_wait_duration"
_membershipWaitBuckets = 10
_membershipWaitBucketWidth = 10 * time.Second
_membershipWaitLogInterval = time.Minute
)
// Watcher allows clients to watch the ring for changes. Whenever membership
// changes, each registered Watcher is notified with the latest hosts.
type Watcher interface {
Notify(latest stringset.Set)
}
// Ring is a rendezvous hashing ring which calculates an ordered replica set
// of healthy addresses which own any given digest.
//
// Address membership within the ring is defined by a dynamic hostlist.List. On
// top of that, replica sets are filtered by the health status of their addresses.
// Membership and health status may be refreshed by using Monitor.
//
// Ring maintains the invariant that it is always non-empty and can always provide
// locations, although in some scenarios the provided locations are not guaranteed
// to be healthy (see Locations).
type Ring interface {
Locations(d core.Digest) []string
Contains(addr string) bool
WaitForContains(addr string) error
Members() stringset.Set
Monitor(stop <-chan struct{})
Refresh()
}
type ring struct {
config Config
cluster hostlist.List
filter healthcheck.Filter
mu sync.RWMutex // Protects the following fields:
addrs stringset.Set
hash *hrw.RendezvousHash
healthy stringset.Set
watchers []Watcher
membershipWaitDuration tally.Histogram
}
// Option allows setting custom parameters for ring.
type Option func(*ring)
// WithWatcher adds a watcher to the ring. Can be used multiple times.
func WithWatcher(w Watcher) Option {
return func(r *ring) { r.watchers = append(r.watchers, w) }
}
// New creates a new Ring whose members are defined by cluster.
func New(
config Config, cluster hostlist.List, filter healthcheck.Filter, scope tally.Scope, opts ...Option) Ring {
config.applyDefaults()
scope = scope.Tagged(map[string]string{"module": "hashring"})
buckets := tally.MustMakeLinearDurationBuckets(0, _membershipWaitBucketWidth, _membershipWaitBuckets)
r := &ring{
config: config,
cluster: cluster,
filter: filter,
membershipWaitDuration: scope.Histogram(_membershipWaitMetric, buckets),
}
for _, opt := range opts {
opt(r)
}
r.Refresh()
log.With("members", r.addrs.ToSlice(), "healthy", r.healthy.ToSlice()).Info("Hash ring initialised")
return r
}
// Locations returns an ordered replica set of healthy addresses which own d.
// If all addresses in the replica set are unhealthy, then returns the next
// healthy address. If all addresses in the ring are unhealthy, then returns
// the first address which owns d (regardless of health). As such, Locations
// always returns a non-empty list.
func (r *ring) Locations(d core.Digest) []string {
r.mu.RLock()
defer r.mu.RUnlock()
nodes := r.hash.GetOrderedNodes(d.ShardID(), len(r.addrs))
if len(nodes) != len(r.addrs) {
// This should never happen.
log.Fatal("invariant violation: ordered hash nodes not equal to cluster size")
}
if len(r.healthy) == 0 {
return []string{nodes[0].Label}
}
var locs []string
for i := 0; i < len(nodes) && (len(locs) == 0 || i < r.config.MaxReplica); i++ {
addr := nodes[i].Label
if r.healthy.Has(addr) {
locs = append(locs, addr)
}
}
return locs
}
// Contains returns whether the ring contains addr.
func (r *ring) Contains(addr string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.addrs.Has(addr)
}
// WaitForContains waits for the ring to contain the given address.
// Returns an error if it times out waiting for the ring to contain the address.
func (r *ring) WaitForContains(addr string) error {
ctx, cancel := context.WithTimeout(context.Background(), r.config.MembershipWaitTimeout)
defer cancel()
start := time.Now()
lastLog := start
b := backoff.NewConstantBackOff(r.config.MembershipWaitInterval)
operation := func() error {
if r.Contains(addr) {
return nil
}
if time.Since(lastLog) >= _membershipWaitLogInterval {
log.With("addr", addr, "elapsed", time.Since(start)).Warn("Address not yet found in hash ring")
lastLog = time.Now()
}
return fmt.Errorf("address %s not found in ring", addr)
}
if err := backoff.Retry(operation, backoff.WithContext(b, ctx)); err != nil {
log.With("addr", addr, "error", err, "timeout", r.config.MembershipWaitTimeout).Error("Timed out waiting to find address in hash ring")
return fmt.Errorf("timed out waiting for membership: %w", err)
}
duration := time.Since(start)
log.With("addr", addr, "duration", duration, "timeout", r.config.MembershipWaitTimeout).Info("Address found in hash ring after waiting")
r.membershipWaitDuration.RecordDuration(duration)
return nil
}
// Members returns a copy of all ring members (healthy and unhealthy).
func (r *ring) Members() stringset.Set {
r.mu.RLock()
defer r.mu.RUnlock()
return r.addrs.Copy()
}
// Monitor refreshes the ring at the configured interval. Blocks until the
// provided stop channel is closed.
func (r *ring) Monitor(stop <-chan struct{}) {
for {
select {
case <-stop:
return
case <-time.After(r.config.RefreshInterval):
r.Refresh()
}
}
}
// Refresh updates the membership and health information of r.
func (r *ring) Refresh() {
latest := r.cluster.Resolve()
healthy := r.filter.Run(latest)
hash := r.hash
if !stringset.Equal(r.addrs, latest) {
// Membership has changed -- update hash nodes.
hash = hrw.NewRendezvousHash(hrw.Murmur3Hash, hrw.UInt64ToFloat64)
for addr := range latest {
hash.AddNode(addr, _defaultWeight)
}
// Notify watchers.
for _, w := range r.watchers {
w.Notify(latest.Copy())
}
}
r.mu.Lock()
r.addrs = latest
r.hash = hash
r.healthy = healthy
r.mu.Unlock()
}