peer.go (417 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package tchannel import ( "container/heap" "errors" "strings" "sync" "time" "github.com/uber/tchannel-go/trand" "go.uber.org/atomic" "golang.org/x/net/context" ) var ( // ErrInvalidConnectionState indicates that the connection is not in a valid state. // This may be due to a race between selecting the connection and it closing, so // it is a network failure that can be retried. ErrInvalidConnectionState = NewSystemError(ErrCodeNetwork, "connection is in an invalid state") // ErrNoPeers indicates that there are no peers. ErrNoPeers = errors.New("no peers available") // ErrPeerNotFound indicates that the specified peer was not found. ErrPeerNotFound = errors.New("peer not found") // ErrNoNewPeers indicates that no previously unselected peer is available. ErrNoNewPeers = errors.New("no new peer available") peerRng = trand.NewSeeded() ) // Connectable is the interface used by peers to create connections. type Connectable interface { // Connect tries to connect to the given hostPort. Connect(ctx context.Context, hostPort string) (*Connection, error) // Logger returns the logger to use. Logger() Logger } // PeerList maintains a list of Peers. type PeerList struct { sync.RWMutex parent *RootPeerList peersByHostPort map[string]*peerScore peerHeap *peerHeap scoreCalculator ScoreCalculator lastSelected uint64 } func newPeerList(root *RootPeerList) *PeerList { return &PeerList{ parent: root, peersByHostPort: make(map[string]*peerScore), scoreCalculator: newPreferIncomingCalculator(), peerHeap: newPeerHeap(), } } // SetStrategy sets customized peer selection strategy. func (l *PeerList) SetStrategy(sc ScoreCalculator) { l.Lock() defer l.Unlock() l.scoreCalculator = sc for _, ps := range l.peersByHostPort { newScore := l.scoreCalculator.GetScore(ps.Peer) l.updatePeer(ps, newScore) } } // Siblings don't share peer lists (though they take care not to double-connect // to the same hosts). func (l *PeerList) newSibling() *PeerList { sib := newPeerList(l.parent) return sib } // Add adds a peer to the list if it does not exist, or returns any existing peer. func (l *PeerList) Add(hostPort string) *Peer { if ps, ok := l.exists(hostPort); ok { return ps.Peer } l.Lock() defer l.Unlock() if p, ok := l.peersByHostPort[hostPort]; ok { return p.Peer } p := l.parent.Add(hostPort) p.addSC() ps := newPeerScore(p, l.scoreCalculator.GetScore(p)) l.peersByHostPort[hostPort] = ps l.peerHeap.addPeer(ps) return p } // GetNew returns a new, previously unselected peer from the peer list, or nil, // if no new unselected peer can be found. func (l *PeerList) GetNew(prevSelected map[string]struct{}) (*Peer, error) { l.Lock() defer l.Unlock() if l.peerHeap.Len() == 0 { return nil, ErrNoPeers } // Select a peer, avoiding previously selected peers. If all peers have been previously // selected, then it's OK to repick them. peer := l.choosePeer(prevSelected, true /* avoidHost */) if peer == nil { peer = l.choosePeer(prevSelected, false /* avoidHost */) } if peer == nil { return nil, ErrNoNewPeers } return peer, nil } // Get returns a peer from the peer list, or nil if none can be found, // will avoid previously selected peers if possible. func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) { peer, err := l.GetNew(prevSelected) if err == ErrNoNewPeers { l.Lock() peer = l.choosePeer(nil, false /* avoidHost */) l.Unlock() } else if err != nil { return nil, err } if peer == nil { return nil, ErrNoPeers } return peer, nil } // Remove removes a peer from the peer list. It returns an error if the peer cannot be found. // Remove does not affect connections to the peer in any way. func (l *PeerList) Remove(hostPort string) error { l.Lock() defer l.Unlock() p, ok := l.peersByHostPort[hostPort] if !ok { return ErrPeerNotFound } p.delSC() delete(l.peersByHostPort, hostPort) l.peerHeap.removePeer(p) return nil } func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) *Peer { var psPopList []*peerScore var ps *peerScore canChoosePeer := func(hostPort string) bool { if _, ok := prevSelected[hostPort]; ok { return false } if avoidHost { if _, ok := prevSelected[getHost(hostPort)]; ok { return false } } return true } size := l.peerHeap.Len() for i := 0; i < size; i++ { popped := l.peerHeap.popPeer() if canChoosePeer(popped.HostPort()) { ps = popped break } psPopList = append(psPopList, popped) } for _, p := range psPopList { heap.Push(l.peerHeap, p) } if ps == nil { return nil } l.peerHeap.pushPeer(ps) ps.chosenCount.Inc() return ps.Peer } // GetOrAdd returns a peer for the given hostPort, creating one if it doesn't yet exist. func (l *PeerList) GetOrAdd(hostPort string) *Peer { // TODO: remove calls to GetOrAdd, use Add instead return l.Add(hostPort) } // Copy returns a copy of the PeerList as a map from hostPort to peer. func (l *PeerList) Copy() map[string]*Peer { l.RLock() defer l.RUnlock() listCopy := make(map[string]*Peer) for k, v := range l.peersByHostPort { listCopy[k] = v.Peer } return listCopy } // Len returns the length of the PeerList. func (l *PeerList) Len() int { l.RLock() defer l.RUnlock() return l.peerHeap.Len() } // exists checks if a hostport exists in the peer list. func (l *PeerList) exists(hostPort string) (*peerScore, bool) { l.RLock() ps, ok := l.peersByHostPort[hostPort] l.RUnlock() return ps, ok } // getPeerScore is called to find the peer and its score from a host port key. // Note that at least a Read lock must be held to call this function. func (l *PeerList) getPeerScore(hostPort string) (*peerScore, uint64, bool) { ps, ok := l.peersByHostPort[hostPort] if !ok { return nil, 0, false } return ps, ps.score, ok } // onPeerChange is called when there is a change that may cause the peer's score to change. // The new score is calculated, and the peer heap is updated with the new score if the score changes. func (l *PeerList) onPeerChange(p *Peer) { l.RLock() ps, psScore, ok := l.getPeerScore(p.hostPort) sc := l.scoreCalculator l.RUnlock() if !ok { return } newScore := sc.GetScore(ps.Peer) if newScore == psScore { return } l.Lock() l.updatePeer(ps, newScore) l.Unlock() } // updatePeer is called to update the score of the peer given the existing score. // Note that a Write lock must be held to call this function. func (l *PeerList) updatePeer(ps *peerScore, newScore uint64) { if ps.score == newScore { return } ps.score = newScore l.peerHeap.updatePeer(ps) } // peerScore represents a peer and scoring for the peer heap. // It is not safe for concurrent access, it should only be used through the PeerList. type peerScore struct { *Peer // score according to the current peer list's ScoreCalculator. score uint64 // index of the peerScore in the peerHeap. Used to interact with container/heap. index int // order is the tiebreaker for when score is equal. It is set when a peer // is pushed to the heap based on peerHeap.order with jitter. order uint64 } func newPeerScore(p *Peer, score uint64) *peerScore { return &peerScore{ Peer: p, score: score, index: -1, } } // Peer represents a single autobahn service or client with a unique host:port. type Peer struct { sync.RWMutex channel Connectable hostPort string onStatusChanged func(*Peer) onClosedConnRemoved func(*Peer) // scCount is the number of subchannels that this peer is added to. scCount uint32 // connections are mutable, and are protected by the mutex. newConnLock sync.Mutex inboundConnections []*Connection outboundConnections []*Connection chosenCount atomic.Uint64 // onUpdate is a test-only hook. onUpdate func(*Peer) } func newPeer(channel Connectable, hostPort string, onStatusChanged func(*Peer), onClosedConnRemoved func(*Peer)) *Peer { if hostPort == "" { panic("Cannot create peer with blank hostPort") } if onStatusChanged == nil { onStatusChanged = noopOnStatusChanged } return &Peer{ channel: channel, hostPort: hostPort, onStatusChanged: onStatusChanged, onClosedConnRemoved: onClosedConnRemoved, } } // HostPort returns the host:port used to connect to this peer. func (p *Peer) HostPort() string { return p.hostPort } // getConn treats inbound and outbound connections as a single virtual list // that can be indexed. The peer must be read-locked. func (p *Peer) getConn(i int) *Connection { inboundLen := len(p.inboundConnections) if i < inboundLen { return p.inboundConnections[i] } return p.outboundConnections[i-inboundLen] } func (p *Peer) getActiveConnLocked() (*Connection, bool) { allConns := len(p.inboundConnections) + len(p.outboundConnections) if allConns == 0 { return nil, false } // We cycle through the connection list, starting at a random point // to avoid always choosing the same connection. var startOffset int if allConns > 1 { startOffset = peerRng.Intn(allConns) } for i := 0; i < allConns; i++ { connIndex := (i + startOffset) % allConns if conn := p.getConn(connIndex); conn.IsActive() { return conn, true } } return nil, false } // getActiveConn will randomly select an active connection. // TODO(prashant): Should we clear inactive connections? // TODO(prashant): Do we want some sort of scoring for connections? func (p *Peer) getActiveConn() (*Connection, bool) { p.RLock() conn, ok := p.getActiveConnLocked() p.RUnlock() return conn, ok } // GetConnection returns an active connection to this peer. If no active connections // are found, it will create a new outbound connection and return it. func (p *Peer) GetConnection(ctx context.Context) (*Connection, error) { if activeConn, ok := p.getActiveConn(); ok { return activeConn, nil } // Lock here to restrict new connection creation attempts to one goroutine p.newConnLock.Lock() defer p.newConnLock.Unlock() // Check active connections again in case someone else got ahead of us. if activeConn, ok := p.getActiveConn(); ok { return activeConn, nil } // No active connections, make a new outgoing connection. return p.Connect(ctx) } // getConnectionRelay gets a connection, and uses the given timeout to lazily // create a context if a new connection is required. func (p *Peer) getConnectionRelay(callTimeout, relayMaxConnTimeout time.Duration) (*Connection, error) { if conn, ok := p.getActiveConn(); ok { return conn, nil } // Lock here to restrict new connection creation attempts to one goroutine p.newConnLock.Lock() defer p.newConnLock.Unlock() // Check active connections again in case someone else got ahead of us. if activeConn, ok := p.getActiveConn(); ok { return activeConn, nil } // Use the lower timeout value of the call timeout and the relay connection timeout. timeout := callTimeout if timeout > relayMaxConnTimeout && relayMaxConnTimeout > 0 { timeout = relayMaxConnTimeout } // When the relay creates outbound connections, we don't want those services // to ever connect back to us and send us traffic. We hide the host:port // so that service instances on remote machines don't try to connect back // and don't try to send Hyperbahn traffic on this connection. ctx, cancel := NewContextBuilder(timeout).HideListeningOnOutbound().Build() defer cancel() return p.Connect(ctx) } // addSC adds a reference to a peer from a subchannel (e.g. peer list). func (p *Peer) addSC() { p.Lock() p.scCount++ p.Unlock() } // delSC removes a reference to a peer from a subchannel (e.g. peer list). func (p *Peer) delSC() { p.Lock() p.scCount-- p.Unlock() } // canRemove returns whether this peer can be safely removed from the root peer list. func (p *Peer) canRemove() bool { p.RLock() count := len(p.inboundConnections) + len(p.outboundConnections) + int(p.scCount) p.RUnlock() return count == 0 } // addConnection adds an active connection to the peer's connection list. // If a connection is not active, returns ErrInvalidConnectionState. func (p *Peer) addConnection(c *Connection, direction connectionDirection) error { conns := p.connectionsFor(direction) if c.readState() != connectionActive { return ErrInvalidConnectionState } p.Lock() *conns = append(*conns, c) p.Unlock() // Inform third parties that a peer gained a connection. p.onStatusChanged(p) return nil } func (p *Peer) connectionsFor(direction connectionDirection) *[]*Connection { if direction == inbound { return &p.inboundConnections } return &p.outboundConnections } // removeConnection will check remove the connection if it exists on connsPtr // and returns whether it removed the connection. func (p *Peer) removeConnection(connsPtr *[]*Connection, changed *Connection) bool { conns := *connsPtr for i, c := range conns { if c == changed { // Remove the connection by moving the last item forward, and slicing the list. last := len(conns) - 1 conns[i], conns[last] = conns[last], nil *connsPtr = conns[:last] return true } } return false } // connectionStateChanged is called when one of the peers' connections states changes. // All non-active connections are removed from the peer. The connection will // still be tracked by the channel until it's completely closed. func (p *Peer) connectionCloseStateChange(changed *Connection) { if changed.IsActive() { return } p.Lock() found := p.removeConnection(&p.inboundConnections, changed) if !found { found = p.removeConnection(&p.outboundConnections, changed) } p.Unlock() if found { p.onClosedConnRemoved(p) // Inform third parties that a peer lost a connection. p.onStatusChanged(p) } } // Connect adds a new outbound connection to the peer. func (p *Peer) Connect(ctx context.Context) (*Connection, error) { return p.channel.Connect(ctx, p.hostPort) } // BeginCall starts a new call to this specific peer, returning an OutboundCall that can // be used to write the arguments of the call. func (p *Peer) BeginCall(ctx context.Context, serviceName, methodName string, callOptions *CallOptions) (*OutboundCall, error) { if callOptions == nil { callOptions = defaultCallOptions } callOptions.RequestState.AddSelectedPeer(p.HostPort()) if err := validateCall(ctx, serviceName, methodName, callOptions); err != nil { return nil, err } conn, err := p.GetConnection(ctx) if err != nil { return nil, err } call, err := conn.beginCall(ctx, serviceName, methodName, callOptions) if err != nil { return nil, err } return call, err } // NumConnections returns the number of inbound and outbound connections for this peer. func (p *Peer) NumConnections() (inbound int, outbound int) { p.RLock() inbound = len(p.inboundConnections) outbound = len(p.outboundConnections) p.RUnlock() return inbound, outbound } // NumPendingOutbound returns the number of pending outbound calls. func (p *Peer) NumPendingOutbound() int { count := 0 p.RLock() for _, c := range p.outboundConnections { count += c.outbound.count() } for _, c := range p.inboundConnections { count += c.outbound.count() } p.RUnlock() return count } func (p *Peer) runWithConnections(f func(*Connection)) { p.RLock() for _, c := range p.inboundConnections { f(c) } for _, c := range p.outboundConnections { f(c) } p.RUnlock() } func (p *Peer) callOnUpdateComplete() { p.RLock() f := p.onUpdate p.RUnlock() if f != nil { f(p) } } func noopOnStatusChanged(*Peer) {} // isEphemeralHostPort returns if hostPort is the default ephemeral hostPort. func isEphemeralHostPort(hostPort string) bool { return hostPort == "" || hostPort == ephemeralHostPort || strings.HasSuffix(hostPort, ":0") }