controller/mutators/etcd/placement_mutator.go (167 lines of code) (raw):
package etcd
import (
"fmt"
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/x/instrument"
"github.com/uber/aresdb/cluster/kvstore"
"github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
"net/http"
)
type placementMutator struct {
client *kvstore.EtcdClient
}
func (p *placementMutator) getServiceID(namespace string) services.ServiceID {
return services.NewServiceID().
SetEnvironment(p.client.Environment).
SetZone(p.client.Zone).
SetName(utils.DataNodeServiceName(namespace))
}
func validateAllAvailable(p placement.Placement) error {
for _, instance := range p.Instances() {
if !instance.IsAvailable() {
return utils.APIError{Code: http.StatusBadRequest, Message: fmt.Sprintf("instance %s is not available", instance.ID())}
}
}
return nil
}
// NewPlacementMutator creates mutator for placement
func NewPlacementMutator(client *kvstore.EtcdClient) common.PlacementMutator {
return &placementMutator{
client: client,
}
}
func checkNumShardsIsPowerOfTwo(numShards int) bool {
return (numShards & (^(-numShards))) == 0
}
func (p *placementMutator) placementOptions() placement.Options {
return placement.NewOptions().
SetInstrumentOptions(instrument.NewOptions()).
SetValidZone(p.client.Zone).
// if we specify more than one new instance, we want to add them all
SetAddAllCandidates(true).
// for now we want to make sure replacement does not affect existing instances not being replaced
SetAllowPartialReplace(false)
}
func (p *placementMutator) BuildInitialPlacement(namespace string, numShards int, numReplica int, instances []placement.Instance) (placement.Placement, error) {
if !checkNumShardsIsPowerOfTwo(numShards) {
return nil, common.ErrInvalidNumShards
}
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID, p.placementOptions())
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, err := placementSvc.BuildInitialPlacement(instances, numShards, numReplica)
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToBuildInitialPlacement)
}
return plm, nil
}
func (p *placementMutator) GetCurrentPlacement(namespace string) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID, p.placementOptions())
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, err := placementSvc.Placement()
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToGetCurrentPlacement)
}
return plm, nil
}
func (p *placementMutator) AddInstance(namespace string, instances []placement.Instance) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID,
p.placementOptions().SetValidateFnBeforeUpdate(validateAllAvailable))
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, _, err := placementSvc.AddInstances(instances)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToAddInstance)
}
return plm, nil
}
func (p *placementMutator) ReplaceInstance(namespace string, leavingInstances []string, newInstances []placement.Instance) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID,
p.placementOptions().SetValidateFnBeforeUpdate(validateAllAvailable))
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, _, err := placementSvc.ReplaceInstances(leavingInstances, newInstances)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToReplaceInstance)
}
return plm, nil
}
func (p *placementMutator) RemoveInstance(namespace string, leavingInstances []string) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID,
p.placementOptions().SetValidateFnBeforeUpdate(validateAllAvailable))
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, err := placementSvc.RemoveInstances(leavingInstances)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToRemoveInstance)
}
return plm, nil
}
func (p *placementMutator) MarkNamespaceAvailable(namespace string) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID, p.placementOptions())
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, err := placementSvc.MarkAllShardsAvailable()
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToMarkAvailable)
}
return plm, nil
}
func (p *placementMutator) MarkInstanceAvailable(namespace string, instance string) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID, p.placementOptions())
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, err := placementSvc.MarkInstanceAvailable(instance)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToMarkAvailable)
}
return plm, nil
}
func (p *placementMutator) MarkShardsAvailable(namespace string, instance string, shards []uint32) (placement.Placement, error) {
serviceID := p.getServiceID(namespace)
placementSvc, err := p.client.Services.PlacementService(serviceID, p.placementOptions())
if err != nil {
return nil, utils.StackError(err, common.ErrMsgFailedToGetPlacementService)
}
plm, err := placementSvc.MarkShardsAvailable(instance, shards...)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToMarkAvailable)
}
return plm, nil
}