controller/handlers/placement.go (192 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"net/http"
"github.com/gorilla/mux"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
apiCom "github.com/uber/aresdb/api/common"
mutatorCom "github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
)
// PlacementHandler handles placement requests
type PlacementHandler struct {
placementMutator mutatorCom.PlacementMutator
}
// NewPlacementHandler creates placement handler
func NewPlacementHandler(mutator mutatorCom.PlacementMutator) PlacementHandler {
return PlacementHandler{
placementMutator: mutator,
}
}
// Register adds paths to router
func (h PlacementHandler) Register(router *mux.Router, wrappers ...utils.HTTPHandlerWrapper) {
router.HandleFunc("/{namespace}/datanode", utils.ApplyHTTPWrappers(h.Get, wrappers...)).Methods(http.MethodGet)
router.HandleFunc("/{namespace}/datanode/available", utils.ApplyHTTPWrappers(h.MarkNamespaceAvailable, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/{namespace}/datanode/init", utils.ApplyHTTPWrappers(h.Init, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/{namespace}/datanode/instances", utils.ApplyHTTPWrappers(h.Add, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/{namespace}/datanode/instances", utils.ApplyHTTPWrappers(h.Replace, wrappers...)).Methods(http.MethodPut)
router.HandleFunc("/{namespace}/datanode/instances", utils.ApplyHTTPWrappers(h.Remove, wrappers...)).Methods(http.MethodDelete)
router.HandleFunc("/{namespace}/datanode/instances/{instance}/available", utils.ApplyHTTPWrappers(h.MarkInstanceAvailable, wrappers...)).Methods(http.MethodPost)
}
func newInstancesFromProto(instancepbs []placementpb.Instance) ([]placement.Instance, error) {
instances := make([]placement.Instance, 0, len(instancepbs))
for _, instancepb := range instancepbs {
instance, err := placement.NewInstanceFromProto(&instancepb)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}
return instances, nil
}
// Init initialize new placement
func (h *PlacementHandler) Init(rw *utils.ResponseWriter, r *http.Request) {
var req InitPlacementRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
newInstances, err := newInstancesFromProto(req.Body.NewInstances)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
plm, err := h.placementMutator.BuildInitialPlacement(req.Namespace, req.Body.NumShards, req.Body.NumReplica, newInstances)
if err != nil {
rw.WriteError(err)
return
}
respondWithPlacement(plm, rw)
}
// Get get the current placement
func (h *PlacementHandler) Get(rw *utils.ResponseWriter, r *http.Request) {
var req NamespaceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
plm, err := h.placementMutator.GetCurrentPlacement(req.Namespace)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
respondWithPlacement(plm, rw)
}
// Add adds new instances
func (h *PlacementHandler) Add(rw *utils.ResponseWriter, r *http.Request) {
var req AddInstancesRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
instances, err := newInstancesFromProto(req.Body.NewInstances)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// validate all shards available in placement before adding instance
plm, err := h.placementMutator.AddInstance(req.Namespace, instances)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
respondWithPlacement(plm, rw)
}
// Replace replace existing instances within placement with new instances
func (h *PlacementHandler) Replace(rw *utils.ResponseWriter, r *http.Request) {
var req ReplaceInstanceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
newInstances, err := newInstancesFromProto(req.Body.NewInstances)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// validate all shards are available before replace instance
plm, err := h.placementMutator.ReplaceInstance(req.Namespace, req.Body.LeavingInstances, newInstances)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
respondWithPlacement(plm, rw)
}
// Remove remove instance from placement
func (h *PlacementHandler) Remove(rw *utils.ResponseWriter, r *http.Request) {
var req RemoveInstanceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if len(req.Body.LeavingInstances) != 1 {
rw.WriteError(ErrRemoveOneInstance)
return
}
// validate all shards are available before replace instance
plm, err := h.placementMutator.RemoveInstance(req.Namespace, req.Body.LeavingInstances)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
respondWithPlacement(plm, rw)
}
// MarkNamespaceAvailable marks all instance/shards in placement as available
func (h *PlacementHandler) MarkNamespaceAvailable(rw *utils.ResponseWriter, r *http.Request) {
var req NamespaceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
plm, err := h.placementMutator.MarkNamespaceAvailable(req.Namespace)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
respondWithPlacement(plm, rw)
}
// MarkInstanceAvailable marks one instance as available
func (h *PlacementHandler) MarkInstanceAvailable(rw *utils.ResponseWriter, r *http.Request) {
var req MarkAvailableRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
var p placement.Placement
if req.Body.AllShards {
p, err = h.placementMutator.MarkInstanceAvailable(req.Namespace, req.Instance)
} else {
p, err = h.placementMutator.MarkShardsAvailable(req.Namespace, req.Instance, req.Body.Shards)
}
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
respondWithPlacement(p, rw)
}
func respondWithPlacement(p placement.Placement, rw *utils.ResponseWriter) {
pb, err := p.Proto()
if err != nil {
rw.WriteError(utils.StackError(err, "failed to marshal placement"))
return
}
rw.WriteObject(pb)
}