datanode/peer_source.go (186 lines of code) (raw):
package datanode
import (
"context"
"errors"
"fmt"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/uber/aresdb/cluster/topology"
"github.com/uber/aresdb/datanode/client"
"github.com/uber/aresdb/datanode/generated/proto/rpc"
"github.com/uber/aresdb/utils"
"google.golang.org/grpc"
"net/url"
"sync"
)
var (
errPeerClosed = errors.New("peer closed")
errPeerNotExist = errors.New("peer does not exist")
)
var grpcDialer = func(target string, opts ...grpc.DialOption) (client rpc.PeerDataNodeClient, closeFn func() error, err error) {
conn, err := grpc.Dial(target, opts...)
if err != nil {
return nil, nil, err
}
return rpc.NewPeerDataNodeClient(conn), func() error { return conn.Close() }, nil
}
type peer struct {
sync.RWMutex
sync.WaitGroup
host topology.Host
dialer client.PeerConnDialer
conn rpc.PeerDataNodeClient
closeFn func() error
closed bool
}
func (p *peer) Host() topology.Host {
return p.host
}
// BorrowConnection from peer
func (p *peer) BorrowConnection(fn client.WithConnectionFn) (err error) {
p.Lock()
if p.closed {
p.Unlock()
return errPeerClosed
}
if p.conn == nil {
parsedURL, err := url.Parse(p.host.Address())
if err != nil {
p.Unlock()
return err
}
p.conn, p.closeFn, err = p.dialer(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), grpc.WithInsecure())
if err != nil {
p.Unlock()
return err
}
}
conn := p.conn
p.Add(1)
p.Unlock()
defer p.Done()
err = p.checkHealth(conn)
if err != nil {
return err
}
fn(p.host.ID(), conn)
return nil
}
func (p *peer) checkHealth(peerNodeClient rpc.PeerDataNodeClient) error {
healthCheckResponse, err := peerNodeClient.Health(context.Background(), &rpc.HealthCheckRequest{Service: "PeerData"})
if err != nil {
return xerrors.Wrapf(err, "failed to check health of %s", p.host.ID())
}
status := healthCheckResponse.GetStatus()
if status != rpc.HealthCheckResponse_SERVING {
return fmt.Errorf("unhealthy peer id: %s, status: %s", p.host.ID(), status.String())
}
return nil
}
// Close close the peer
func (p *peer) Close() {
utils.GetLogger().With("host", p.host.String()).Info("closing peer connection")
// waiting for on going operation with client connection
p.Wait()
p.Lock()
defer p.Unlock()
if p.conn != nil && p.closeFn != nil {
err := p.closeFn()
if err != nil {
utils.GetLogger().With("host", p.host.String(), "error", err.Error()).Error("failed to close grpc connection")
}
p.closed = true
p.conn = nil
}
}
// newPeer create a new peer object
func newPeer(host topology.Host, dialer client.PeerConnDialer) *peer {
return &peer{
host: host,
dialer: dialer,
}
}
type peerSource struct {
sync.RWMutex
watch topology.MapWatch
peers map[string]*peer
dialer client.PeerConnDialer
done chan struct{}
}
func (ps *peerSource) borrowConnection(hostID string, fn client.WithConnectionFn) error {
ps.RLock()
peer, exist := ps.peers[hostID]
if !exist {
ps.RUnlock()
return errPeerNotExist
}
ps.RUnlock()
return peer.BorrowConnection(fn)
}
func (ps *peerSource) BorrowConnection(hostIDs []string, fn client.WithConnectionFn) error {
multiError := xerrors.NewMultiError()
for _, hostID := range hostIDs {
err := ps.borrowConnection(hostID, fn)
if err != nil {
utils.GetLogger().With("peer", hostID, "error", err.Error()).Warn("failed to borrow connection from peer")
multiError = multiError.Add(err)
} else {
utils.GetLogger().With("peer", hostID).Debug("successfully borrowed connection from peer")
return nil
}
}
return multiError.FinalError()
}
func (ps *peerSource) watchTopoChange() {
for {
select {
case <-ps.watch.C():
ps.updateTopoMap(ps.watch.Get())
case <-ps.done:
return
}
}
}
func (ps *peerSource) Close() {
close(ps.done)
ps.Lock()
defer ps.Unlock()
for _, peer := range ps.peers {
peer.Close()
}
}
func (ps *peerSource) updateTopoMap(topoMap topology.Map) {
ps.Lock()
defer ps.Unlock()
// unknown host
knownHosts := make(map[string]struct{})
for _, host := range topoMap.Hosts() {
knownHosts[host.ID()] = struct{}{}
if _, exist := ps.peers[host.ID()]; !exist {
ps.peers[host.ID()] = newPeer(host, ps.dialer)
}
}
for hostID, peer := range ps.peers {
if _, known := knownHosts[hostID]; !known {
delete(ps.peers, hostID)
go func() {
peer.Close()
}()
}
}
}
// NewPeerSource creates PeerSource
func NewPeerSource(topo topology.Topology, dialerOverride client.PeerConnDialer) (client.PeerSource, error) {
dialer := grpcDialer
if dialerOverride != nil {
dialer = dialerOverride
}
mapWatch, err := topo.Watch()
if err != nil {
return nil, err
}
<-mapWatch.C()
topoMap := mapWatch.Get()
ps := &peerSource{
watch: mapWatch,
dialer: dialer,
done: make(chan struct{}),
peers: make(map[string]*peer),
}
ps.updateTopoMap(topoMap)
go ps.watchTopoChange()
return ps, nil
}