vermeer/apps/master/services/http_admin.go (336 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package services import ( "fmt" "net/http" "strconv" "strings" "vermeer/apps/common" "vermeer/apps/master/threshold" "vermeer/apps/master/workers" //. "vermeer/apps/master/graphs" . "vermeer/apps/master/workers" "github.com/gin-gonic/gin" ) type AdminGraphSaveHandler struct { common.SenHandler } // POST func (gh *AdminGraphSaveHandler) POST(ctx *gin.Context) { spaceName := ctx.Param("space_name") graphName := ctx.Param("graph_name") workers, err := adminBiz(ctx).SaveGraph(spaceName, graphName) if isErr(err, ctx) { return } ctx.JSON(http.StatusOK, GraphPersistenceResponse{Workers: workers}) } type AdminGraphReadHandler struct { common.SenHandler } // POST func (gh *AdminGraphReadHandler) POST(ctx *gin.Context) { spaceName := ctx.Param("space_name") graphName := ctx.Param("graph_name") workers, err := adminBiz(ctx).ReadGraph(spaceName, graphName) if isErr(err, ctx) { return } ctx.JSON(http.StatusOK, GraphPersistenceResponse{Workers: workers}) } type AdminGraphCreateHandler struct { common.SenHandler } type AdminGraphsHandler struct { common.SenHandler } func (gh *AdminGraphsHandler) GET(ctx *gin.Context) { spaceName := ctx.Param("space_name") biz := adminGraphBiz(ctx, spaceName) graphs, err := biz.GetSpaceGraphs(spaceName) if isErr(err, ctx) { return } ctx.JSON(http.StatusOK, GraphsResponse{Graphs: graphs}) } func (gh *AdminGraphsHandler) DELETE(ctx *gin.Context) { spaceName := ctx.Param("space_name") deleteCount := 0 failedGraphs := make([]string, 0) if spaceName != "" { biz := adminGraphBiz(ctx, spaceName) graphs, err := biz.GetGraphs() if err != nil { return } for _, graph := range graphs { if graph.SpaceName != spaceName { continue } err := biz.DeleteGraph(graph.Name) if err == nil { deleteCount++ } else { failedGraphs = append(failedGraphs, fmt.Sprintf("space: %v graph: %v delete error: %v", spaceName, graph.Name, err)) } } } if len(failedGraphs) == 0 { ok(ctx, "deleted ok") } else { ok(ctx, fmt.Sprintf("deleted count:%v,failed:%v", deleteCount, failedGraphs)) } } // POST func (gh *AdminGraphCreateHandler) POST(ctx *gin.Context) { spaceName := ctx.Param("space_name") biz := adminGraphBiz(ctx, spaceName) req := GraphCreateRequest{} err := ctx.BindJSON(&req) if isBad(err != nil, ctx, func() string { return "request body not correct" }) { return } if _, err = biz.CreateGraph(req.Name, nil); isErr(err, ctx) { return } ok(ctx, "Graph creation successful.") } type AdminGraphHandler struct { common.SenHandler } // GET func (gh *AdminGraphHandler) GET(ctx *gin.Context) { spaceName := ctx.Param("space_name") name := ctx.Param("graph_name") biz := adminGraphBiz(ctx, spaceName) g, err := biz.GetGraph(name) if isErr(err, ctx) { return } ctx.JSON(http.StatusOK, GraphResponse{Graph: g}) } // DELETE func (gh *AdminGraphHandler) DELETE(ctx *gin.Context) { spaceName := ctx.Param("space_name") name := ctx.Param("graph_name") biz := adminGraphBiz(ctx, spaceName) if err := biz.DeleteGraph(name); isErr(err, ctx) { return } ok(ctx, "deleted ok") } type AdminEdgesHandler struct { common.SenHandler } func (eh *AdminEdgesHandler) GET(ctx *gin.Context) { spaceName := ctx.Param("space_name") graphName := ctx.Param("graph_name") biz := adminGraphBiz(ctx, spaceName) vertexId := ctx.Query("vertex_id") direction := ctx.Query("direction") if isBad(vertexId == "", ctx, func() string { return fmt.Sprintf("vertex_id not exist: %s", vertexId) }) { return } resp := EdgesResponse{} var err error resp.InEdges, resp.OutEdges, resp.InEdgeProperty, err = biz.GetEdges(graphName, vertexId, direction) if isErr(err, ctx) { return } ctx.JSON(http.StatusOK, resp) } type AdminVerticesHandler struct { common.SenHandler } func (vh *AdminVerticesHandler) POST(ctx *gin.Context) { spaceName := ctx.Param("space_name") graphName := ctx.Param("graph_name") biz := adminGraphBiz(ctx, spaceName) req := VerticesRequest{} err := ctx.BindJSON(&req) //校验参数 if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request body not correct: %s", err) }) { return } //校验顶点数量 if isBad(len(req.VertexIds) == 0 || len(req.VertexIds) > getVertexLimitNum, ctx, func() string { return fmt.Sprintf("vertex_ids num can't be 0 and can't over %d", getVertexLimitNum) }) { return } vertices, err := biz.GetVertices(graphName, req.VertexIds) if isErr(err, ctx) { return } ctx.JSON(http.StatusOK, VerticesResponse{Vertices: vertices}) } type AdminWorkersHandler struct { common.SenHandler } type AdminWorkersResponse struct { common.BaseResp AllWorkers map[string][]*WorkerClient `json:"all_workers,omitempty"` GroupWorkers map[string][]*WorkerClient `json:"group_workers,omitempty"` SpaceWorkers map[string][]*WorkerClient `json:"space_workers,omitempty"` GraphWorkers map[string][]*WorkerClient `json:"graph_workers,omitempty"` CommonWorkers map[string][]*WorkerClient `json:"common_workers,omitempty"` DutyWorkers map[string][]*WorkerClient `json:"duty_workers,omitempty"` } func (vh *AdminWorkersHandler) GET(ctx *gin.Context) { adminBiz(ctx) // for checking auth dutyWorkers := make(map[string][]*WorkerClient) offlineWorkers := make([]*WorkerClient, 0) workerMap := make(map[string]*WorkerClient) for _, g := range GraphMgr.GetAllGraphs() { workers := make([]*WorkerClient, 0) for _, w := range g.Workers { worker := WorkerMgr.GetWorkerInfo(w.Name) if worker != nil { workers = append(workers, worker) if _, ok := workerMap[worker.Name]; !ok && worker.State == "OFFLINE" { offlineWorkers = append(offlineWorkers, worker) workerMap[worker.Name] = worker } } else { workers = append(workers, &WorkerClient{Name: w.Name, State: "NOT_FOUND"}) } } dutyWorkers[toSpaceGraph(g.SpaceName, g.Name)] = SortWorkersAsc(workers) } ctx.JSON(http.StatusOK, AdminWorkersResponse{ AllWorkers: map[string][]*WorkerClient{ "online": WorkerMgr.GetAllWorkers(), "offline": offlineWorkers, }, GroupWorkers: WorkerMgr.AllGroupWorkers(), SpaceWorkers: WorkerMgr.AllSpaceWorkers(), GraphWorkers: WorkerMgr.AllGraphWorkers(), CommonWorkers: WorkerMgr.CommonWorkers(), DutyWorkers: dutyWorkers, }) } type AdminWorkerGroupAllocHandler struct { common.SenHandler } func (wg *AdminWorkerGroupAllocHandler) DELETE(ctx *gin.Context) { adminBiz(ctx) // for checking auth workerGroup := ctx.Param("worker_group") // unallocate the worker group from both space and graph num, err := WorkerMgr.UnallocGroup(workerGroup) if isErr(err, ctx) { return } ok(ctx, fmt.Sprintf("unallocated worker group: '%s' success, updated: %d", workerGroup, num)) } type AdminWorkerGroupSpaceAllocHandler struct { common.SenHandler } func (vh *AdminWorkerGroupSpaceAllocHandler) POST(ctx *gin.Context) { adminBiz(ctx) // for checking auth workerGroup := ctx.Param("worker_group") spaceName := ctx.Param("space_name") err := WorkerMgr.AllocGroupSpace(workerGroup, spaceName) if isErr(err, ctx) { return } ok(ctx, fmt.Sprintf("worker group:'%s' allocated to space: '%s' success", workerGroup, spaceName)) } type AdminWorkerGroupGraphAllocHandler struct { common.SenHandler } func (vh *AdminWorkerGroupGraphAllocHandler) POST(ctx *gin.Context) { adminBiz(ctx) // for checking auth workerGroup := ctx.Param("worker_group") spaceName := ctx.Param("space_name") graphName := ctx.Param("graph_name") err := WorkerMgr.AllocGroupGraph(workerGroup, spaceName, graphName) if isErr(err, ctx) { return } ok(ctx, fmt.Sprintf("worker group:'%s' allocated to graph: '%s/%s' success", workerGroup, spaceName, graphName)) } type AdminWorkerGroupHandler struct { common.SenHandler } func (vh *AdminWorkerGroupHandler) POST(ctx *gin.Context) { adminBiz(ctx) // for checking auth workerName := ctx.Param("worker_name") workerGroup := ctx.Param("worker_group") err := WorkerMgr.SetWorkerGroup(workerName, strings.TrimSpace(workerGroup)) if isErr(err, ctx) { return } ok(ctx, fmt.Sprintf("set worker group success, worker: '%s' group: %s", workerName, workerGroup)) } type AdminDispatchPauseHandler struct { common.SenHandler } func (h *AdminDispatchPauseHandler) POST(ctx *gin.Context) { err := adminBiz(ctx).PauseDispatchTask() if isErr(err, ctx) { return } ok(ctx, "paused the scheduler dispatch successfully") } func (h *AdminDispatchPauseHandler) GET(ctx *gin.Context) { ok(ctx, strconv.FormatBool(adminBiz(ctx).IsDispatchPaused())) } type AdminDispatchResumeHandler struct { common.SenHandler } func (h *AdminDispatchResumeHandler) POST(ctx *gin.Context) { err := adminBiz(ctx).ResumeDispatchTask() if isErr(err, ctx) { return } ok(ctx, "resume the scheduler dispatch successfully") } // ----------------------------Threshold----------------------------------- type AdminThresholdMemHandler struct { common.SenHandler } type AdminThresholdMemRequest struct { GroupName string `json:"group_name"` MaxMem *uint32 `json:"max_mem"` MinFree *uint32 `json:"min_free"` GcPct *uint32 `json:"gc_pct"` } type AdminThresholdMemResponse struct { common.BaseResp AllMaxMem map[string]uint32 `json:"all_max_mem,omitempty"` AllMinFree map[string]uint32 `json:"all_min_free,omitempty"` AllGcPct map[string]uint32 `json:"all_gc_pct,omitempty"` } func (h *AdminThresholdMemHandler) POST(ctx *gin.Context) { req := AdminThresholdMemRequest{} err := ctx.BindJSON(&req) if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request body not correct: %s", err) }) { return } if isBad(req.GroupName == "", ctx, func() string { return "the `group name` should not be empty" }) { return } flag := false if req.MaxMem != nil || req.MinFree != nil || req.GcPct != nil { flag = true } if isBad(!flag, ctx, func() string { return "there is nothing to set" }) { return } if req.MaxMem != nil && isErr(threshold.SetGroupMaxMem(req.GroupName, *req.MaxMem), ctx) { return } if req.MinFree != nil && isErr(threshold.SetGroupMinFree(req.GroupName, *req.MinFree), ctx) { return } if req.GcPct != nil && isErr(threshold.SetGroupGcPct(req.GroupName, *req.GcPct), ctx) { return } // send to workers via the group total, success, err := workers.SendMemLimitGroup(req.GroupName, &workers.WorkerMemLimit{ MaxMem: *req.MaxMem, MinFree: *req.MinFree, GcRatio: float32(*req.GcPct) / 100, }) if isBad(err != nil, ctx, func() string { return fmt.Sprintf("send group '%s' mem limit failed: %s, total: %d, success: %d", req.GroupName, err, total, success) }) { return } ok(ctx, fmt.Sprintf("set the threshold for `memory` successfully with group: '%s', total: %d, success: %d", req.GroupName, total, success)) } func (h *AdminThresholdMemHandler) GET(ctx *gin.Context) { ctx.JSON(http.StatusOK, AdminThresholdMemResponse{ AllMaxMem: threshold.AllGroupMaxMem(), AllMinFree: threshold.AllGroupMinFree(), AllGcPct: threshold.AllGroupGcPct(), }) }