server/api/cluster.go (151 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package api import ( "errors" "strings" "github.com/gin-gonic/gin" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/server/helper" "github.com/apache/kvrocks-controller/store" ) type MigrateSlotRequest struct { Target int `json:"target" validate:"required"` Slot store.SlotRange `json:"slot" validate:"required"` SlotOnly bool `json:"slot_only"` } type CreateClusterRequest struct { Name string `json:"name" validate:"required"` Nodes []string `json:"nodes" validate:"required"` Password string `json:"password"` Replicas int `json:"replicas"` } type ClusterHandler struct { s store.Store } func (handler *ClusterHandler) List(c *gin.Context) { namespace := c.Param("namespace") clusters, err := handler.s.ListCluster(c, namespace) if err != nil && !errors.Is(err, consts.ErrNotFound) { helper.ResponseError(c, err) return } helper.ResponseOK(c, gin.H{"clusters": clusters}) } func (handler *ClusterHandler) Get(c *gin.Context) { cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster) helper.ResponseOK(c, gin.H{"cluster": cluster}) } func (handler *ClusterHandler) Create(c *gin.Context) { namespace := c.Param("namespace") var req CreateClusterRequest if err := c.BindJSON(&req); err != nil { helper.ResponseBadRequest(c, err) return } clusterStore := handler.s if err := clusterStore.CheckNewNodes(c, req.Nodes); err != nil { helper.ResponseError(c, err) return } cluster, err := store.NewCluster(req.Name, req.Nodes, req.Replicas) if err != nil { helper.ResponseBadRequest(c, err) return } cluster.SetPassword(req.Password) checkClusterMode := strings.ToLower(c.GetHeader(consts.HeaderDontCheckClusterMode)) == "yes" for _, node := range cluster.GetNodes() { if !checkClusterMode { break } version, err := node.CheckClusterMode(c) if err != nil { helper.ResponseError(c, err) return } if version != -1 { helper.ResponseBadRequest(c, errors.New("node is already in cluster mode")) return } } if err := clusterStore.CreateCluster(c, namespace, cluster); err != nil { helper.ResponseError(c, err) return } helper.ResponseCreated(c, gin.H{"cluster": cluster}) } func (handler *ClusterHandler) Remove(c *gin.Context) { namespace := c.Param("namespace") cluster := c.Param("cluster") err := handler.s.RemoveCluster(c, namespace, cluster) if err != nil { helper.ResponseError(c, err) return } helper.ResponseNoContent(c) } func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { namespace := c.Param("namespace") cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster) var req MigrateSlotRequest if err := c.BindJSON(&req); err != nil { helper.ResponseBadRequest(c, err) return } err := cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly) if err != nil { helper.ResponseError(c, err) return } if req.SlotOnly { err = handler.s.UpdateCluster(c, namespace, cluster) } else { // The version should be increased after the slot migration is done err = handler.s.SetCluster(c, namespace, cluster) } if err != nil { helper.ResponseError(c, err) return } helper.ResponseOK(c, gin.H{"cluster": cluster}) } func (handler *ClusterHandler) Import(c *gin.Context) { namespace := c.Param("namespace") clusterName := c.Param("cluster") var req struct { Nodes []string `json:"nodes" validate:"required"` Password string `json:"password"` } if err := c.BindJSON(&req); err != nil { helper.ResponseBadRequest(c, err) return } if len(req.Nodes) == 0 { helper.ResponseBadRequest(c, errors.New("nodes should NOT be empty")) return } firstNode := store.NewClusterNode(req.Nodes[0], req.Password) clusterNodesStr, err := firstNode.GetClusterNodesString(c) if err != nil { helper.ResponseError(c, err) return } cluster, err := store.ParseCluster(clusterNodesStr) if err != nil { helper.ResponseError(c, err) return } cluster.SetPassword(req.Password) newNodes := make([]string, 0) for _, node := range cluster.GetNodes() { newNodes = append(newNodes, node.Addr()) } if err := handler.s.CheckNewNodes(c, newNodes); err != nil { helper.ResponseError(c, err) return } cluster.Name = clusterName if err := handler.s.CreateCluster(c, namespace, cluster); err != nil { helper.ResponseError(c, err) return } helper.ResponseOK(c, gin.H{"cluster": cluster}) }