swim/ping_request_sender.go (139 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package swim import ( "errors" "sync" "time" log "github.com/uber-common/bark" "github.com/uber/ringpop-go/logging" "github.com/uber/ringpop-go/shared" "github.com/uber/tchannel-go/json" ) // A PingRequest is used to make a ping request to a remote node type pingRequest struct { Source string `json:"source"` SourceIncarnation int64 `json:"sourceIncarnationNumber"` Target string `json:"target"` Checksum uint32 `json:"checksum"` Changes []Change `json:"changes"` } // A PingRequestSender is used to make a ping request to a remote node type pingRequestSender struct { node *Node peer string target string timeout time.Duration logger log.Logger } // NewPingRequestSender returns a new PingRequestSender func newPingRequestSender(node *Node, peer, target string, timeout time.Duration) *pingRequestSender { p := &pingRequestSender{ node: node, peer: peer, target: target, timeout: timeout, logger: logging.Logger("ping").WithField("local", node.Address()), } return p } func (p *pingRequestSender) SendPingRequest() (*pingResponse, error) { p.logger.WithFields(log.Fields{ "peer": p.peer, "target": p.target, }).Debug("ping request send") ctx, cancel := shared.NewTChannelContext(p.timeout) defer cancel() var res pingResponse select { case err := <-p.MakeCall(ctx, &res): if err == nil { p.node.memberlist.Update(res.Changes) } return &res, err case <-ctx.Done(): // call timed out return nil, errors.New("ping request timed out") } } func (p *pingRequestSender) MakeCall(ctx json.Context, res *pingResponse) <-chan error { errC := make(chan error, 1) go func() { defer close(errC) changes, bumpPiggybackCounters := p.node.disseminator.IssueAsSender() req := &pingRequest{ Source: p.node.Address(), SourceIncarnation: p.node.Incarnation(), Checksum: p.node.memberlist.Checksum(), Changes: changes, Target: p.target, } peer := p.node.channel.Peers().GetOrAdd(p.peer) err := json.CallPeer(ctx, peer, p.node.service, "/protocol/ping-req", req, &res) if err != nil { bumpPiggybackCounters() errC <- err return } errC <- nil }() return errC } // indirectPing is used to check if a target node can be reached indirectly. // The indirectPing is performed by sending a specifiable amount of ping // requests nodes in n's membership. func indirectPing(n *Node, target string, amount int, timeout time.Duration) (reached bool, errs []error) { resCh := sendPingRequests(n, target, amount, timeout) // wait for responses from the ping-reqs for result := range resCh { switch res := result.(type) { case *pingResponse: if res.Ok { return true, errs } // If the ping to the target was not-ok we want to wait for more results. case error: errs = append(errs, res) } } return false, errs } // sendPingRequests sends ping requests to the target address and returns a channel //containing the responses. Responses can be one of type: // (1) error: if the call to peer failed // (2) PingResponse: if the peer performed the ping request func sendPingRequests(node *Node, target string, size int, timeout time.Duration) <-chan interface{} { var peerAddresses []string peers := node.memberlist.RandomPingableMembers(size, map[string]bool{target: true}) for _, peer := range peers { peerAddresses = append(peerAddresses, peer.Address) } node.EmitEvent(PingRequestsSendEvent{ Local: node.Address(), Target: target, Peers: peerAddresses, }) var wg sync.WaitGroup resC := make(chan interface{}, size) for _, peer := range peers { wg.Add(1) go func(peer Member) { defer wg.Done() p := newPingRequestSender(node, peer.Address, target, timeout) p.logger.WithFields(log.Fields{ "peer": peer.Address, "target": p.target, }).Debug("sending ping request") var startTime = time.Now() res, err := p.SendPingRequest() if err != nil { node.EmitEvent(PingRequestSendErrorEvent{ Local: node.Address(), Target: target, Peers: peerAddresses, Peer: peer.Address, }) resC <- err return } node.EmitEvent(PingRequestsSendCompleteEvent{ Local: node.Address(), Target: target, Peers: peerAddresses, Peer: peer.Address, Duration: time.Now().Sub(startTime), }) resC <- res }(peer) } // wait for all sends to complete before closing channel go func() { wg.Wait() close(resC) }() return resC }