swim/member.go (213 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package swim import ( "bytes" "encoding/binary" "math/rand" "strconv" "sync" "github.com/uber/ringpop-go/membership" "github.com/dgryski/go-farm" "github.com/uber/ringpop-go/util" ) const ( // Alive is the member "alive" state Alive = "alive" // Suspect is the member "suspect" state Suspect = "suspect" // Faulty is the member "faulty" state Faulty = "faulty" // Leave is the member "leave" state Leave = "leave" // Tombstone is the member "tombstone" state Tombstone = "tombstone" ) var ( byteBufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } ) // A Member is a member in the member list type Member struct { Address string `json:"address"` Status string `json:"status"` Incarnation int64 `json:"incarnationNumber"` Labels LabelMap `json:"labels,omitempty"` } // LabelMap is a type Used by Member to store the labels of a member. It stores // string to string mappings containing user data that is gossiped around in SWIM. type LabelMap map[string]string // GetAddress returns the Address of a member. func (m Member) GetAddress() string { return m.Address } // Label returns the value of a label named by key. The `has` boolean indicates // if the label was set on the member or not func (m Member) Label(key string) (value string, has bool) { value, has = m.Labels[key] return } // Identity returns the identity of a member. If a specific identity is not set // for the member the address will be used as the identity func (m Member) Identity() string { // Read the identity from the labels identity, set := m.Label(membership.IdentityLabelKey) if set { return identity } // return the member's address if there is no identity set return m.Address } // suspect interface func (m Member) address() string { return m.Address } func (m Member) incarnation() int64 { return m.Incarnation } func (m *Member) populateFromChange(c *Change) { m.Address = c.Address m.Incarnation = c.Incarnation m.Status = c.Status m.Labels = c.Labels } // checksumString fills a buffer that is passed with the contents that this node // needs to add to the checksum string. func (m Member) checksumString(b *bytes.Buffer) { b.WriteString(m.Address) b.WriteString(m.Status) b.WriteString(strconv.FormatInt(m.Incarnation, 10)) m.Labels.checksumString(b) } // copy creates a non-nil version of the LabelMap that copies all existing // entries of the map. This can be used to create a new version of the labels // that can be mutated before putting it on a Member to make updates without // mutating the map that was already on a Member func (l LabelMap) copy() (result LabelMap) { result = make(map[string]string, len(l)) for key, value := range l { result[key] = value } return } // checksumString adds the label portion of the checksum to the buffer that is // passed in. The string will not be appended in the case where labels are not // set on this member. This is for backwards compatibility reasons with older // versions. func (l LabelMap) checksumString(b *bytes.Buffer) { checksum := l.checksum() if checksum == 0 { // we don't write the checksum of the labels if the value of the checksum // is 0 (zero) to be backwards compatible with ringpop applications on // an older version. This only works if the newer version does not use // labels return } // write #labels<checksum> to the buffer which will be appended to the // checksum string for the node. b.WriteString("#labels") b.WriteString(strconv.Itoa(int(checksum))) } // checksum computes a checksum for the labels. It will return 0 (zero) when no // labels are set, but 0 does not indicate that no labels are set. It could be // possible that 0 is computed as the checksum. func (l LabelMap) checksum() int32 { var checksum uint32 lb := byteBufferPool.Get().(*bytes.Buffer) // make sure the buffer is empty after getting a buffer from our pool lb.Reset() for key, value := range l { // The bytes used for checksumming are the following: // <4 bytes: length of key> binary.Write(lb, binary.BigEndian, int32(len(key))) // <n bytes: key> lb.WriteString(key) // <4 bytes: length of value> binary.Write(lb, binary.BigEndian, int32(len(value))) // <n bytes: value> lb.WriteString(value) // The checksum is calculated by xorring the checksums of all individual // labels. This makes the checksum of the labels order independant. This // is easier compared to sorting the labels by their key because of two // reasons. // 1. It saves memory allocations to have the keys in a slice. // 2. This method is guaranteed to be locale independant where sorting // of strings might be different on different locale settings. This // would cause indefinite fullsync storms because two ringpops would // never agree on membership checksums. checksum = checksum ^ farm.Fingerprint32(lb.Bytes()) lb.Reset() } // give the buffer back to the pool byteBufferPool.Put(lb) var signedChecksum int32 // This line converts an unsigned integer to a signed integer (32 bits). // It is needed to be able to calculate the same value in nodejs for // ringpop-node and the integration tests in ringpop-common. signedChecksum = int32(checksum>>1)<<1 | int32(checksum&uint32(1)) return signedChecksum } // shuffles slice of members pseudo-randomly, returns new slice func shuffle(members []*Member) []*Member { newMembers := make([]*Member, len(members), cap(members)) newIndexes := rand.Perm(len(members)) for o, n := range newIndexes { newMembers[n] = members[o] } return newMembers } // shouldProcessGossip evaluates the rules of swim and returns whether the // gossip should be processed. eg. Copy the memberstate of the gossip to the // known memberstate in the memberlist (creating the member when is does not // exist). func shouldProcessGossip(old *Member, gossip *Member) bool { // tombstones will not be accepted if we have no knowledge about the member if gossip.Status == Tombstone && old == nil { return false } // accept the gossip if we learn about the member through a gossip if old == nil { return true } // gossips with a higher incarnation number will always be accepted since // it is a newer version of the member than we know if gossip.Incarnation > old.Incarnation { return true } // gossips with a lower incarnation number will never be accepted as we // have a newer version of the member already if gossip.Incarnation < old.Incarnation { return false } // now we know that the incarnation number of the gossip and the current // view of the member are the same 'age'. Lets evaluate member state to see // which version to pick // if the status of the gossip takes precedence over the status of our // current member we will accept the gossip. if statePrecedence(gossip.Status) > statePrecedence(old.Status) { return true } if statePrecedence(gossip.Status) < statePrecedence(old.Status) { return false } // keep the checksum values in local variables. The checksums are not cached // and require some compute to get them, better to do once than twice. gossipLabelsChecksum := gossip.Labels.checksum() oldLabelsChecksum := old.Labels.checksum() // Gossips with a higher checksum should be processed to let the cluster // converge to the labels that cause the highest checksum. if gossipLabelsChecksum > oldLabelsChecksum { return true } // If the gossipped labels have a lower checksum we do want to keep the // current memberstate in our memberlist, therefore the gossip should not be // processed. if gossipLabelsChecksum < oldLabelsChecksum { return false } // we prefer the old member over the gossiped member if they have the same // internal state. This prevents the gossip to be continuously be gossiped // around in the network return false } func statePrecedence(s string) int { switch s { case Alive: return 0 case Suspect: return 1 case Faulty: return 2 case Leave: return 3 case Tombstone: return 4 default: // unknown states will never have precedence return -1 } } func (m *Member) isReachable() bool { return m.Status == Alive || m.Status == Suspect } // A Change is a change a member to be applied type Change struct { Source string `json:"source"` SourceIncarnation int64 `json:"sourceIncarnationNumber"` Address string `json:"address"` Incarnation int64 `json:"incarnationNumber"` Status string `json:"status"` Tombstone bool `json:"tombstone,omitempty"` Labels map[string]string `json:"labels,omitempty"` // Use util.Timestamp for bi-direction binding to time encoded as // integer Unix timestamp in JSON Timestamp util.Timestamp `json:"timestamp"` } // validateIncoming validates incoming changes before they are passed into the // swim state machine. This is usefull to make late adjustments to incoming // changes to transform some legacy wire protocol changes into new swim terminology func (c Change) validateIncoming() Change { if c.Status == Faulty && c.Tombstone { c.Status = Tombstone } return c } // validateOutgoing validates outgoing changes before they are passed to the module // responsible for sending the change to the other side. This can be used to make sure // that our changes are parsable by older version of ringpop-go to prevent unwanted // behavior when incompatible changes are sent to older versions. func (c Change) validateOutgoing() Change { if c.Status == Tombstone { c.Status = Faulty c.Tombstone = true } return c } func (c *Change) populateSubject(m *Member) { if m == nil { return } c.Address = m.Address c.Incarnation = m.Incarnation c.Status = m.Status c.Labels = m.Labels } func (c *Change) populateSource(m *Member) { if m == nil { return } c.Source = m.Address c.SourceIncarnation = m.Incarnation } func (c *Change) scrubSource() { c.Source = "" c.SourceIncarnation = 0 } // suspect interface func (c Change) address() string { return c.Address } func (c Change) incarnation() int64 { return c.Incarnation } func (c Change) overrides(c2 Change) bool { if c.Incarnation > c2.Incarnation { return true } if c.Incarnation < c2.Incarnation { return false } return statePrecedence(c.Status) > statePrecedence(c2.Status) } func (c Change) isPingable() bool { return c.Status == Alive || c.Status == Suspect }