controller/handlers/config.go (205 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"net/http"
"github.com/gorilla/mux"
"github.com/m3db/m3/src/cluster/services"
"github.com/uber-go/tally"
apiCom "github.com/uber/aresdb/api/common"
"github.com/uber/aresdb/cluster/kvstore"
mutatorCom "github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
aresUtils "github.com/uber/aresdb/utils"
"go.uber.org/fx"
"go.uber.org/zap"
)
// ConfigHandlerParams defines params needed to initialize ConfigHandler
type ConfigHandlerParams struct {
fx.In
Logger *zap.SugaredLogger
Scope tally.Scope
JobMutator mutatorCom.JobMutator
SchemaMutator mutatorCom.TableSchemaMutator
EtcdClient *kvstore.EtcdClient
}
// ConfigHandler serves requests for job configurations
type ConfigHandler struct {
logger *zap.SugaredLogger
scope tally.Scope
jobMutator mutatorCom.JobMutator
schemaMutator mutatorCom.TableSchemaMutator
etcdClient *kvstore.EtcdClient
}
// NewConfigHandler creates a new ConfigHandler
func NewConfigHandler(p ConfigHandlerParams) ConfigHandler {
return ConfigHandler{
logger: p.Logger,
scope: p.Scope,
jobMutator: p.JobMutator,
schemaMutator: p.SchemaMutator,
etcdClient: p.EtcdClient,
}
}
// Register adds paths to router
func (h ConfigHandler) Register(router *mux.Router, wrappers ...utils.HTTPHandlerWrapper) {
router.HandleFunc("/{namespace}/jobs/{job}", utils.ApplyHTTPWrappers(h.GetJob, wrappers...)).Methods(http.MethodGet)
router.HandleFunc("/{namespace}/jobs", utils.ApplyHTTPWrappers(h.GetJobs, wrappers...)).Methods(http.MethodGet)
router.HandleFunc("/{namespace}/jobs/{job}", utils.ApplyHTTPWrappers(h.DeleteJob, wrappers...)).Methods(http.MethodDelete)
router.HandleFunc("/{namespace}/jobs/{job}", utils.ApplyHTTPWrappers(h.UpdateJob, wrappers...)).Methods(http.MethodPut)
router.HandleFunc("/{namespace}/jobs", utils.ApplyHTTPWrappers(h.AddJob, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/{namespace}/hash", utils.ApplyHTTPWrappers(h.GetHash, wrappers...)).Methods(http.MethodGet)
}
func (h ConfigHandler) getNumShards(namespace string) (int, error) {
if h.etcdClient != nil {
serviceID := services.NewServiceID().
SetEnvironment(h.etcdClient.Environment).
SetZone(h.etcdClient.Zone).
SetName(aresUtils.DataNodeServiceName(namespace))
placementSvc, err := h.etcdClient.Services.PlacementService(serviceID, nil)
if err != nil {
return 0, ErrFailedToFetchPlacement
}
plmt, err := placementSvc.Placement()
if err != nil {
return 0, ErrFailedToFetchPlacement
}
return plmt.NumShards(), nil
}
return 0, nil
}
// GetJob swagger:route GET /config/{namespace}/jobs/{job} getJob
// gets job config by name
func (h ConfigHandler) GetJob(w *utils.ResponseWriter, r *http.Request) {
var req GetJobRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
job, err := h.jobMutator.GetJob(req.Namespace, req.JobName)
if err != nil {
statusCode := http.StatusInternalServerError
if mutatorCom.IsNonExist(err) {
statusCode = http.StatusNotFound
err = mutatorCom.ErrJobConfigDoesNotExist
}
w.WriteErrorWithCode(statusCode, err)
return
}
table, err := h.schemaMutator.GetTable(req.Namespace, job.AresTableConfig.Name)
if err != nil {
if !mutatorCom.IsNonExist(err) {
w.WriteErrorWithCode(
http.StatusInternalServerError,
ErrFailedToFetchTableSchemaForJobConfig)
return
}
} else {
job.AresTableConfig.Table = table
}
numShards, err := h.getNumShards(req.Namespace)
if err != nil {
w.WriteError(err)
return
}
job.NumShards = numShards
w.WriteObject(job)
}
// GetJobs swagger:route GET /config/{namespace}/jobs getJobs
// returns all jobs config
func (h ConfigHandler) GetJobs(w *utils.ResponseWriter, r *http.Request) {
var req GetJobsRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
jobs, err := h.jobMutator.GetJobs(req.Namespace)
if err != nil {
statusCode := http.StatusInternalServerError
if mutatorCom.IsNonExist(err) {
statusCode = http.StatusNotFound
err = mutatorCom.ErrJobConfigDoesNotExist
}
w.WriteErrorWithCode(statusCode, err)
return
}
numShards, err := h.getNumShards(req.Namespace)
if err != nil {
w.WriteError(err)
return
}
for idx, job := range jobs {
jobs[idx].NumShards = numShards
table, err := h.schemaMutator.GetTable(req.Namespace, job.AresTableConfig.Name)
if err != nil {
// only return system error when error is not NonExist
if !mutatorCom.IsNonExist(err) {
w.WriteErrorWithCode(http.StatusInternalServerError, ErrFailedToFetchTableSchemaForJobConfig)
return
}
} else {
jobs[idx].AresTableConfig.Table = table
}
}
w.WriteObject(jobs)
}
// DeleteJob swagger:route DELETE /config/{namespace}/jobs/{job} deleteJob
// deletes a job
func (h ConfigHandler) DeleteJob(w *utils.ResponseWriter, r *http.Request) {
var req DeleteJobRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
err = h.jobMutator.DeleteJob(req.Namespace, req.JobName)
if err != nil {
statusCode := http.StatusInternalServerError
if err == mutatorCom.ErrJobConfigDoesNotExist {
statusCode = http.StatusBadRequest
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteObject(nil)
}
// UpdateJob swagger:route PUT /config/{namespace}/jobs/{job} updateJob
// updates job config
//
// Consumes:
// - application/json
func (h ConfigHandler) UpdateJob(w *utils.ResponseWriter, r *http.Request) {
var req UpdateJobRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
err = h.jobMutator.UpdateJob(req.Namespace, req.Body)
if err != nil {
w.WriteErrorWithCode(http.StatusInternalServerError, err)
return
}
w.WriteObject(req.Body)
}
// AddJob swagger:route POST /config/{namespace}/jobs addJob
// adds a new job
//
// Consumes:
// - application/json
func (h ConfigHandler) AddJob(w *utils.ResponseWriter, r *http.Request) {
var req AddJobRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
err = h.jobMutator.AddJob(req.Namespace, req.Body)
if err != nil {
statusCode := http.StatusInternalServerError
if err == mutatorCom.ErrNamespaceDoesNotExist || err == mutatorCom.ErrJobConfigAlreadyExist {
statusCode = http.StatusBadRequest
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteObject(nil)
}
// GetHash swagger:route GET /config/{namespace}/hash getJobsHash
// returns hash that will be different if any job changed
func (h ConfigHandler) GetHash(w *utils.ResponseWriter, r *http.Request) {
var req GetHashRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
hash, err := h.jobMutator.GetHash(req.Namespace)
if err != nil {
statusCode := http.StatusInternalServerError
if mutatorCom.IsNonExist(err) {
statusCode = http.StatusNotFound
err = mutatorCom.ErrJobConfigDoesNotExist
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteJSONBytesWithCode(http.StatusOK, []byte(hash), nil)
}