store/cluster.go (291 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package store
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync/atomic"
"github.com/apache/kvrocks-controller/consts"
)
type Cluster struct {
Name string `json:"name"`
Version atomic.Int64 `json:"-"`
Shards []*Shard `json:"shards"`
}
func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) {
if len(nodes) == 0 {
return nil, errors.New("cluster nodes should NOT be empty")
}
if replicas < 0 {
return nil, errors.New("replicas should NOT be less than 0")
}
if replicas == 0 {
replicas = 1
}
if len(nodes)%replicas != 0 {
return nil, errors.New("cluster nodes should be divisible by replicas")
}
shardCount := len(nodes) / replicas
shards := make([]*Shard, 0)
slotRanges := CalculateSlotRanges(shardCount)
for i := 0; i < shardCount; i++ {
shard := NewShard()
shard.Nodes = make([]Node, 0)
for j := 0; j < replicas; j++ {
addr := nodes[i*replicas+j]
role := RoleMaster
if j != 0 {
role = RoleSlave
}
node := NewClusterNode(addr, "")
node.SetRole(role)
shard.Nodes = append(shard.Nodes, node)
}
shard.SlotRanges = append(shard.SlotRanges, slotRanges[i])
shards = append(shards, shard)
}
cluster := &Cluster{Name: name, Shards: shards}
cluster.Version.Store(1)
return cluster, nil
}
func (cluster *Cluster) Clone() *Cluster {
clone := &Cluster{
Name: cluster.Name,
Shards: make([]*Shard, 0),
}
clone.Version.Store(cluster.Version.Load())
for _, shard := range cluster.Shards {
clone.Shards = append(clone.Shards, shard.Clone())
}
return clone
}
// SetPassword will set the password for all nodes in the cluster.
func (cluster *Cluster) SetPassword(password string) {
for i := 0; i < len(cluster.Shards); i++ {
for j := 0; j < len(cluster.Shards[i].Nodes); j++ {
cluster.Shards[i].Nodes[j].SetPassword(password)
}
}
}
func (cluster *Cluster) ToSlotString() (string, error) {
var builder strings.Builder
for i, shard := range cluster.Shards {
shardSlotsString, err := shard.ToSlotsString()
if err != nil {
return "", fmt.Errorf("found err at shard[%d]: %w", i, err)
}
builder.WriteString(shardSlotsString)
}
return builder.String(), nil
}
func (cluster *Cluster) GetShard(shardIndex int) (*Shard, error) {
if shardIndex < 0 || shardIndex >= len(cluster.Shards) {
return nil, consts.ErrIndexOutOfRange
}
return cluster.Shards[shardIndex], nil
}
func (cluster *Cluster) AddNode(shardIndex int, addr, role, password string) error {
if shardIndex < 0 || shardIndex >= len(cluster.Shards) {
return consts.ErrIndexOutOfRange
}
return cluster.Shards[shardIndex].addNode(addr, role, password)
}
func (cluster *Cluster) RemoveNode(shardIndex int, nodeID string) error {
if shardIndex < 0 || shardIndex >= len(cluster.Shards) {
return consts.ErrIndexOutOfRange
}
return cluster.Shards[shardIndex].removeNode(nodeID)
}
func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
shardIdx int, masterNodeID, preferredNodeID string,
) (string, error) {
shard, err := cluster.GetShard(shardIdx)
if err != nil {
return "", err
}
newMasterNodeID, err := shard.promoteNewMaster(ctx, masterNodeID, preferredNodeID)
if err != nil {
return "", err
}
cluster.Shards[shardIdx] = shard
return newMasterNodeID, nil
}
func (cluster *Cluster) SyncToNodes(ctx context.Context) error {
for i := 0; i < len(cluster.Shards); i++ {
for _, node := range cluster.Shards[i].Nodes {
if err := node.SyncClusterInfo(ctx, cluster); err != nil {
return err
}
}
}
return nil
}
func (cluster *Cluster) GetNodes() []Node {
nodes := make([]Node, 0)
for i := 0; i < len(cluster.Shards); i++ {
nodes = append(nodes, cluster.Shards[i].Nodes...)
}
return nodes
}
func (cluster *Cluster) Reset(ctx context.Context) error {
for i := 0; i < len(cluster.Shards); i++ {
for _, node := range cluster.Shards[i].Nodes {
if err := node.Reset(ctx); err != nil {
return err
}
}
}
return nil
}
func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) {
sourceShardIdx := -1
for i := 0; i < len(cluster.Shards); i++ {
slotRanges := cluster.Shards[i].SlotRanges
for _, slotRange := range slotRanges {
if slotRange.HasOverlap(&slot) {
if sourceShardIdx != -1 {
return sourceShardIdx, consts.ErrSlotRangeBelongsToMultipleShards
}
sourceShardIdx = i
}
}
}
if sourceShardIdx == -1 {
return -1, consts.ErrSlotNotBelongToAnyShard
}
return sourceShardIdx, nil
}
func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetShardIdx int, slotOnly bool) error {
if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) {
return consts.ErrIndexOutOfRange
}
sourceShardIdx, err := cluster.findShardIndexBySlot(slot)
if err != nil {
return err
}
if sourceShardIdx == targetShardIdx {
return consts.ErrShardIsSame
}
if slotOnly {
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil
}
if cluster.Shards[sourceShardIdx].IsMigrating() || cluster.Shards[targetShardIdx].IsMigrating() {
return consts.ErrShardSlotIsMigrating
}
// Send the migration command to the source node
sourceMasterNode := cluster.Shards[sourceShardIdx].GetMasterNode()
if sourceMasterNode == nil {
return consts.ErrNotFound
}
targetNodeID := cluster.Shards[targetShardIdx].GetMasterNode().ID()
if err := sourceMasterNode.MigrateSlot(ctx, slot, targetNodeID); err != nil {
return err
}
// Will start the data migration in the background
cluster.Shards[sourceShardIdx].MigratingSlot = &slot
cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx
return nil
}
func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID string) error {
version := cluster.Version.Add(1)
for i := 0; i < len(cluster.Shards); i++ {
for _, node := range cluster.Shards[i].Nodes {
clusterNode, ok := node.(*ClusterNode)
if !ok {
continue
}
err := clusterNode.GetClient().Do(ctx, "CLUSTERX", "SETSLOT", slot, "NODE", targetNodeID, version).Err()
if err != nil {
return err
}
}
}
return nil
}
// ParseCluster will parse the cluster string into cluster topology.
func ParseCluster(clusterStr string) (*Cluster, error) {
if len(clusterStr) == 0 {
return nil, errors.New("cluster nodes string error")
}
nodeStrings := strings.Split(clusterStr, "\n")
if len(nodeStrings) == 0 {
return nil, errors.New("cluster nodes string parser error")
}
var clusterVer int64 = -1
var shards Shards
slaveNodes := make(map[string][]Node)
for _, nodeString := range nodeStrings {
fields := strings.Split(nodeString, " ")
if len(fields) < 7 {
return nil, fmt.Errorf("require at least 7 fields, node info[%s]", nodeString)
}
node := &ClusterNode{
id: fields[0],
addr: strings.Split(fields[1], "@")[0],
}
if strings.Contains(fields[2], ",") {
node.role = strings.Split(fields[2], ",")[1]
} else {
node.role = fields[2]
}
var err error
clusterVer, err = strconv.ParseInt(fields[6], 10, 64)
if err != nil {
return nil, fmt.Errorf("node version error, node info[%q]", nodeString)
}
if node.role == RoleMaster {
shard := NewShard()
shard.Nodes = append(shard.Nodes, node)
// remain fields are slot ranges
for i := 8; i < len(fields); i++ {
slotRange, err := ParseSlotRange(fields[i])
if err != nil {
return nil, fmt.Errorf("parse slots error for node[%s]: %w", nodeString, err)
}
shard.SlotRanges = append(shard.SlotRanges, *slotRange)
}
shards = append(shards, shard)
} else if node.role == RoleSlave {
slaveNodes[fields[3]] = append(slaveNodes[fields[3]], node)
} else {
return nil, fmt.Errorf("node role error, node info[%q]", nodeString)
}
}
if clusterVer == -1 {
return nil, fmt.Errorf("no cluster version, cluster info[%q]", clusterStr)
}
sort.Sort(shards)
for i := 0; i < len(shards); i++ {
masterNode := shards[i].Nodes[0]
shards[i].Nodes = append(shards[i].Nodes, slaveNodes[masterNode.ID()]...)
}
clusterInfo := &Cluster{
Shards: shards,
}
clusterInfo.Version.Store(clusterVer)
return clusterInfo, nil
}
// MarshalJSON is a custom function since the atomic.Int64 type does not directly implement JSON marshaling.
func (cluster *Cluster) MarshalJSON() ([]byte, error) {
type Alias Cluster // to avoid recursion
return json.Marshal(&struct {
Version int64 `json:"version"`
*Alias
}{
Version: cluster.Version.Load(),
Alias: (*Alias)(cluster),
})
}
// UnmarshalJSON is a custom function since the atomic.Int64 type does not directly implement JSON unmarshaling.
func (cluster *Cluster) UnmarshalJSON(data []byte) error {
type Alias Cluster
aux := &struct {
Version int64 `json:"version"`
*Alias
}{
Alias: (*Alias)(cluster),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
cluster.Version.Store(aux.Version)
return nil
}