swim/handlers.go (108 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"errors"
log "github.com/uber-common/bark"
"github.com/uber/tchannel-go/json"
"golang.org/x/net/context"
)
// emptyArg is a blank arguments used as filler for making TChannel calls that
// require nothing to be passed to TChannel's arg3.
type emptyArg struct{}
// Endpoint is an identifier for an internal swim endpoint
type Endpoint string
const (
// PingEndpoint is the identifier for /protocol/ping
PingEndpoint Endpoint = "ping"
// PingReqEndpoint is the identifier for /protocol/ping-req
PingReqEndpoint Endpoint = "ping-req"
)
// Status contains a status string of the response from a handler.
type Status struct {
Status string `json:"status"`
}
// HealResponse contains a list of nodes where healing was attempted
type HealResponse struct {
Targets []string `json:"targets"`
Error string `json:"error"`
}
// notImplementedHandler is a dummy handler that returns an error explaining
// this method is not implemented.
func notImplementedHandler(ctx json.Context, req *emptyArg) (*emptyArg, error) {
return nil, errors.New("handler not implemented")
}
func (n *Node) registerHandlers() error {
handlers := map[string]interface{}{
"/protocol/join": n.joinHandler,
"/protocol/ping": n.pingHandler,
"/protocol/ping-req": n.pingRequestHandler,
"/admin/debugSet": notImplementedHandler,
"/admin/debugClear": notImplementedHandler,
"/admin/gossip": n.gossipHandler, // Deprecated
"/admin/gossip/start": n.gossipHandlerStart,
"/admin/gossip/stop": n.gossipHandlerStop,
"/admin/healpartition/disco": n.discoverProviderHealerHandler,
"/admin/tick": n.tickHandler, // Deprecated
"/admin/gossip/tick": n.tickHandler,
"/admin/member/leave": n.adminLeaveHandler,
"/admin/member/join": n.adminJoinHandler,
"/admin/reap": n.reapFaultyMembersHandler,
}
return json.Register(n.channel, handlers, n.errorHandler)
}
func (n *Node) joinHandler(ctx json.Context, req *joinRequest) (*joinResponse, error) {
res, err := handleJoin(n, req)
if err != nil {
n.logger.WithFields(log.Fields{
"error": err,
"joinRequest": req,
}).Debug("invalid join request received")
return nil, err
}
return res, nil
}
func (n *Node) pingHandler(ctx json.Context, req *ping) (*ping, error) {
return handlePing(n, req)
}
func (n *Node) pingRequestHandler(ctx json.Context, req *pingRequest) (*pingResponse, error) {
return handlePingRequest(n, req)
}
func (n *Node) gossipHandler(ctx json.Context, req *emptyArg) (*emptyArg, error) {
switch n.gossip.Stopped() {
case true:
n.gossip.Start()
case false:
n.gossip.Stop()
}
return &emptyArg{}, nil
}
func (n *Node) gossipHandlerStart(ctx json.Context, req *emptyArg) (*emptyArg, error) {
n.gossip.Start()
return &emptyArg{}, nil
}
func (n *Node) gossipHandlerStop(ctx json.Context, req *emptyArg) (*emptyArg, error) {
n.gossip.Stop()
return &emptyArg{}, nil
}
func (n *Node) discoverProviderHealerHandler(ctx json.Context, req *emptyArg) (*HealResponse, error) {
targets, err := n.healer.Heal()
msg := ""
if err != nil {
msg = err.Error()
}
return &HealResponse{Targets: targets, Error: msg}, nil
}
func (n *Node) tickHandler(ctx json.Context, req *emptyArg) (*ping, error) {
n.gossip.ProtocolPeriod()
return &ping{Checksum: n.memberlist.Checksum()}, nil
}
func (n *Node) adminJoinHandler(ctx json.Context, req *emptyArg) (*Status, error) {
n.memberlist.SetLocalStatus(Alive)
return &Status{Status: "rejoined"}, nil
}
func (n *Node) adminLeaveHandler(ctx json.Context, req *emptyArg) (*Status, error) {
n.memberlist.SetLocalStatus(Leave)
return &Status{Status: "ok"}, nil
}
// reapFaultyMembersHandler iterates through the local members of this nodes and
// declares all the members marked as faulty as a tombstone. This will clean all
// these members from the membership in the complete cluster due to the gossipy
// nature of swim
func (n *Node) reapFaultyMembersHandler(ctx json.Context, req *emptyArg) (*Status, error) {
members := n.memberlist.GetMembers()
for _, member := range members {
if member.Status == Faulty {
// declare all faulty members as tombstone
n.memberlist.MakeTombstone(member.Address, member.Incarnation)
}
}
return &Status{Status: "ok"}, nil
}
// errorHandler is called when one of the handlers returns an error.
func (n *Node) errorHandler(ctx context.Context, err error) {
n.logger.WithField("error", err).Info("error occurred")
}