controller/handlers/assignment.go (133 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package handlers import ( "fmt" apiCom "github.com/uber/aresdb/api/common" "net/http" "github.com/gorilla/mux" "github.com/uber/aresdb/controller/models" mutatorCom "github.com/uber/aresdb/controller/mutators/common" "github.com/uber/aresdb/utils" "go.uber.org/zap" ) // AssignmentHandler serves requests for ingestion job assignments type AssignmentHandler struct { logger *zap.SugaredLogger assignmentMutator mutatorCom.IngestionAssignmentMutator schemaMutator mutatorCom.TableSchemaMutator membershipMutator mutatorCom.MembershipMutator } // NewAssignmentHandler creates a new AssignmentHandler func NewAssignmentHandler(logger *zap.SugaredLogger, assignmentMutator mutatorCom.IngestionAssignmentMutator, schemaMutator mutatorCom.TableSchemaMutator, membershipMutator mutatorCom.MembershipMutator) AssignmentHandler { return AssignmentHandler{ logger: logger, assignmentMutator: assignmentMutator, schemaMutator: schemaMutator, membershipMutator: membershipMutator, } } // Register adds paths to router func (h AssignmentHandler) Register(router *mux.Router, wrappers ...utils.HTTPHandlerWrapper) { router.HandleFunc("/{namespace}/assignments/{subscriber}", utils.ApplyHTTPWrappers(h.GetAssignment, wrappers...)).Methods(http.MethodGet) router.HandleFunc("/{namespace}/assignments", utils.ApplyHTTPWrappers(h.GetAssignments, wrappers...)).Methods(http.MethodGet) router.HandleFunc("/{namespace}/hash/{subscriber}", utils.ApplyHTTPWrappers(h.GetHash, wrappers...)).Methods(http.MethodGet) } // GetAssignment swagger:route GET /assignment/{namespace}/assignments/{subscriber} getAssignment // gets assignment by subscriber name func (h AssignmentHandler) GetAssignment(w *utils.ResponseWriter, r *http.Request) { var req GetAssignmentRequest err := apiCom.ReadRequest(r, &req, w.SetRequest) if err != nil { w.WriteErrorWithCode(http.StatusBadRequest, err) return } assignment, err := h.assignmentMutator.GetIngestionAssignment(req.Namespace, req.Subscriber) if err != nil { statusCode := http.StatusInternalServerError if mutatorCom.IsNonExist(err) { statusCode = http.StatusBadRequest err = mutatorCom.ErrIngestionAssignmentDoesNotExist } w.WriteErrorWithCode(statusCode, err) return } for i, job := range assignment.Jobs { table, err := h.schemaMutator.GetTable(req.Namespace, job.AresTableConfig.Name) if err != nil { w.WriteError(ErrFailedToFetchTableSchemaForJobConfig) return } assignment.Jobs[i].AresTableConfig.Table = table } var instances []models.Instance if h.membershipMutator != nil { instances, err = h.membershipMutator.GetInstances(req.Namespace) if err != nil { w.WriteError(err) return } } w.WriteObject(composeSingleAssignment(instances, assignment)) } // GetAssignments swagger:route GET /assignment/{namespace}/assignments getAssignments // returns all assignments func (h AssignmentHandler) GetAssignments(w *utils.ResponseWriter, r *http.Request) { var req GetJobsRequest err := apiCom.ReadRequest(r, &req, w.SetRequest) if err != nil { w.WriteErrorWithCode(http.StatusBadRequest, err) return } assignments, err := h.assignmentMutator.GetIngestionAssignments(req.Namespace) if err != nil { statusCode := http.StatusInternalServerError if mutatorCom.IsNonExist(err) { statusCode = http.StatusBadRequest } w.WriteErrorWithCode(statusCode, err) return } results := make([]models.IngestionAssignment, len(assignments)) for i, assignment := range assignments { for i, job := range assignment.Jobs { table, err := h.schemaMutator.GetTable(req.Namespace, job.AresTableConfig.Name) if err != nil { w.WriteError(ErrFailedToFetchTableSchemaForJobConfig) return } assignment.Jobs[i].AresTableConfig.Table = table } var instances []models.Instance if h.membershipMutator != nil { instances, err = h.membershipMutator.GetInstances(req.Namespace) if err != nil { w.WriteError(err) return } } results[i] = composeSingleAssignment(instances, assignment) } w.WriteObject(results) } // GetHash swagger:route GET /assignment/{namespace}/hash/{subscriber} getAssignmentHash // returns hash that will be different if any thing changed for given assignment func (h AssignmentHandler) GetHash(w *utils.ResponseWriter, r *http.Request) { var req GetAssignmentHashRequest err := apiCom.ReadRequest(r, &req, w.SetRequest) if err != nil { w.WriteErrorWithCode(http.StatusBadRequest, err) return } hash, err := h.assignmentMutator.GetHash(req.Namespace, req.Subscriber) if err != nil { statusCode := http.StatusInternalServerError if mutatorCom.IsNonExist(err) { statusCode = http.StatusNotFound } w.WriteErrorWithCode(statusCode, err) return } w.WriteJSONBytesWithCode(http.StatusOK, []byte(hash), nil) } func composeSingleAssignment(instances []models.Instance, assignment models.IngestionAssignment) models.IngestionAssignment { instanceViews := make(map[string]models.Instance, len(instances)) for _, instance := range instances { instance.Address = fmt.Sprintf("%s:%d", instance.Host, instance.Port) instanceViews[instance.Name] = instance } return models.IngestionAssignment{ Subscriber: assignment.Subscriber, Jobs: assignment.Jobs, Instances: instanceViews, } }