store/cluster_shard.go (224 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package store
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"go.uber.org/zap"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/consts"
)
type Shard struct {
Nodes []Node `json:"nodes"`
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
MigratingSlot *SlotRange `json:"migrating_slot"`
}
type Shards []*Shard
func (s Shards) Len() int {
return len(s)
}
func (s Shards) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s Shards) Less(i, j int) bool {
if len(s[i].SlotRanges) == 0 {
return false
} else if len(s[j].SlotRanges) == 0 {
return true
}
return s[i].SlotRanges[0].Start < s[j].SlotRanges[0].Start
}
func NewShard() *Shard {
return &Shard{
Nodes: make([]Node, 0),
SlotRanges: make([]SlotRange, 0),
MigratingSlot: nil,
TargetShardIndex: -1,
}
}
func (shard *Shard) Clone() *Shard {
clone := NewShard()
clone.SlotRanges = make([]SlotRange, len(shard.SlotRanges))
copy(clone.SlotRanges, shard.SlotRanges)
clone.TargetShardIndex = shard.TargetShardIndex
clone.MigratingSlot = shard.MigratingSlot
clone.Nodes = make([]Node, len(shard.Nodes))
copy(clone.Nodes, shard.Nodes)
return clone
}
func (shard *Shard) ClearMigrateState() {
shard.MigratingSlot = nil
shard.TargetShardIndex = -1
}
func (shard *Shard) IsServicing() bool {
for _, slotRange := range shard.SlotRanges {
if slotRange.Start != -1 || slotRange.Stop != -1 {
return true
}
}
return shard.IsMigrating()
}
func (shard *Shard) addNode(addr, role, password string) error {
if role != RoleMaster && role != RoleSlave {
return fmt.Errorf("%w: role", consts.ErrInvalidArgument)
}
for _, node := range shard.Nodes {
if node.Addr() == addr {
return consts.ErrAlreadyExists
}
}
if role == RoleMaster && len(shard.Nodes) > 0 {
return fmt.Errorf("master node %w", consts.ErrAlreadyExists)
}
node := NewClusterNode(addr, password)
node.SetRole(role)
shard.Nodes = append(shard.Nodes, node)
return nil
}
func (shard *Shard) IsMigrating() bool {
return shard.MigratingSlot != nil && shard.TargetShardIndex != -1
}
func (shard *Shard) GetMasterNode() Node {
for _, node := range shard.Nodes {
if node.IsMaster() {
return node
}
}
return nil
}
func (shard *Shard) removeNode(nodeID string) error {
isFound := false
for i, node := range shard.Nodes {
if node.ID() != nodeID {
continue
}
if node.IsMaster() {
return fmt.Errorf("cannot remove master node: %w", consts.ErrInvalidArgument)
}
shard.Nodes = append(shard.Nodes[:i], shard.Nodes[i+1:]...)
isFound = true
}
if !isFound {
return consts.ErrNotFound
}
return nil
}
func (shard *Shard) getNewMasterNodeIndex(ctx context.Context, masterNodeIndex int, preferredNodeID string) int {
newMasterNodeIndex := -1
var newestOffset uint64
for i, node := range shard.Nodes {
// don't promote the current master node
if i == masterNodeIndex {
continue
}
clusterNodeInfo, err := node.GetClusterNodeInfo(ctx)
if err != nil {
logger.Get().With(
zap.Error(err),
zap.String("id", node.ID()),
zap.String("addr", node.Addr()),
).Warn("Skip the node due to failed to get cluster info")
continue
}
// If the preferredNodeID is not empty, we will use it as the new master node.
if preferredNodeID != "" && node.ID() == preferredNodeID {
newMasterNodeIndex = i
break
}
if clusterNodeInfo.Sequence >= newestOffset {
newMasterNodeIndex = i
newestOffset = clusterNodeInfo.Sequence
}
}
return newMasterNodeIndex
}
// PromoteNewMaster promotes a new master node in the shard,
// it will return the new master node ID.
//
// The masterNodeID is used to check if the node is the current master node if it's not empty.
// The preferredNodeID is used to specify the preferred node to be promoted as the new master node,
// it will choose the node with the highest sequence number if the preferredNodeID is empty.
func (shard *Shard) promoteNewMaster(ctx context.Context, masterNodeID, preferredNodeID string) (string, error) {
if len(shard.Nodes) <= 1 {
return "", consts.ErrShardNoReplica
}
oldMasterNodeIndex := -1
for i, node := range shard.Nodes {
if node.IsMaster() {
oldMasterNodeIndex = i
break
}
}
if oldMasterNodeIndex == -1 {
return "", consts.ErrOldMasterNodeNotFound
}
if masterNodeID != "" && shard.Nodes[oldMasterNodeIndex].ID() != masterNodeID {
return "", consts.ErrNodeIsNotMaster
}
newMasterNodeIndex := shard.getNewMasterNodeIndex(ctx, oldMasterNodeIndex, preferredNodeID)
if newMasterNodeIndex == -1 {
return "", consts.ErrShardNoMatchNewMaster
}
shard.Nodes[oldMasterNodeIndex].SetRole(RoleSlave)
shard.Nodes[newMasterNodeIndex].SetRole(RoleMaster)
preferredNewMasterNode := shard.Nodes[newMasterNodeIndex]
return preferredNewMasterNode.ID(), nil
}
func (shard *Shard) HasOverlap(slotRange *SlotRange) bool {
for _, shardSlotRange := range shard.SlotRanges {
if shardSlotRange.HasOverlap(slotRange) {
return true
}
}
return false
}
func (shard *Shard) ToSlotsString() (string, error) {
var builder strings.Builder
masterNodeIndex := -1
for i, node := range shard.Nodes {
if node.IsMaster() {
masterNodeIndex = i
break
}
}
if masterNodeIndex == -1 {
return "", errors.New("missing master node")
}
for i, node := range shard.Nodes {
builder.WriteString(node.ID())
builder.WriteByte(' ')
builder.WriteString(strings.Replace(node.Addr(), ":", " ", 1))
builder.WriteByte(' ')
if i == masterNodeIndex {
builder.WriteString(RoleMaster)
builder.WriteByte(' ')
builder.WriteByte('-')
builder.WriteByte(' ')
for j, slotRange := range shard.SlotRanges {
builder.WriteString(slotRange.String())
if j != len(shard.SlotRanges)-1 {
builder.WriteByte(' ')
}
}
} else {
builder.WriteString(RoleSlave)
builder.WriteByte(' ')
builder.WriteString(shard.Nodes[masterNodeIndex].ID())
}
builder.WriteByte('\n')
}
return builder.String(), nil
}
// UnmarshalJSON unmarshal a Shard from JSON bytes,
// it's required since Shard.Nodes is an interface slice.
// So we need to take into a concrete type.
func (shard *Shard) UnmarshalJSON(bytes []byte) error {
var data struct {
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
MigratingSlot *SlotRange `json:"migrating_slot"`
Nodes []*ClusterNode `json:"nodes"`
}
if err := json.Unmarshal(bytes, &data); err != nil {
return err
}
shard.SlotRanges = data.SlotRanges
shard.TargetShardIndex = data.TargetShardIndex
shard.MigratingSlot = data.MigratingSlot
shard.Nodes = make([]Node, len(data.Nodes))
for i, node := range data.Nodes {
shard.Nodes[i] = node
}
return nil
}