store/cluster_node.go (235 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package store
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/apache/kvrocks-controller/util"
"github.com/go-playground/validator/v10"
"github.com/go-redis/redis/v8"
)
const (
RoleMaster = "master"
RoleSlave = "slave"
NodeIDLen = 40
)
const (
dialTimeout = 3200 * time.Millisecond
readTimeout = 3 * time.Second
writeTimeout = 3 * time.Second
minIdleConns = 3
)
var (
_validator = validator.New()
clients sync.Map
)
type Node interface {
ID() string
Password() string
Addr() string
IsMaster() bool
SetRole(string)
SetPassword(string)
Reset(ctx context.Context) error
GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error)
GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
SyncClusterInfo(ctx context.Context, cluster *Cluster) error
CheckClusterMode(ctx context.Context) (int64, error)
MigrateSlot(ctx context.Context, slot SlotRange, NodeID string) error
MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error
GetClusterNodesString(ctx context.Context) (string, error)
}
type ClusterNode struct {
id string
addr string
role string
password string
createdAt int64
}
type ClusterInfo struct {
CurrentEpoch int64 `json:"cluster_current_epoch"`
MigratingSlot *SlotRange `json:"migrating_slot"`
MigratingState string `json:"migrating_state"`
}
type ClusterNodeInfo struct {
Sequence uint64 `json:"sequence"`
Role string `json:"role"`
}
func NewClusterNode(addr, password string) *ClusterNode {
return &ClusterNode{
id: util.GenerateNodeID(),
addr: addr,
password: password,
role: RoleMaster,
createdAt: time.Now().Unix(),
}
}
func (n *ClusterNode) ID() string {
return n.id
}
func (n *ClusterNode) Password() string {
return n.password
}
func (n *ClusterNode) SetPassword(password string) {
n.password = password
}
func (n *ClusterNode) SetRole(role string) {
n.role = role
}
func (n *ClusterNode) Addr() string {
return n.addr
}
func (n *ClusterNode) Validate() error {
if len(n.id) == 0 {
return errors.New("node id shouldn't be empty")
}
if len(n.id) != NodeIDLen {
return errors.New("the length of node id must be 40")
}
if n.role != RoleMaster && n.role != RoleSlave {
return errors.New("node role should be 'master' or 'slave'")
}
return _validator.Struct(n)
}
func (n *ClusterNode) IsMaster() bool {
return n.role == RoleMaster
}
func (n *ClusterNode) GetClient() *redis.Client {
if client, ok := clients.Load(n.ID()); ok {
if rdsClient, ok := client.(*redis.Client); ok {
return rdsClient
}
}
client := redis.NewClient(&redis.Options{
Addr: n.addr,
Password: n.password,
DialTimeout: dialTimeout,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
MaxRetries: -1, // don't retry inside the client
MinIdleConns: minIdleConns,
})
clients.Store(n.ID(), client)
return client
}
func (n *ClusterNode) CheckClusterMode(ctx context.Context) (int64, error) {
clusterInfo, err := n.GetClusterInfo(ctx)
if err != nil {
if strings.Contains(err.Error(), "cluster is not initialized") {
return -1, nil
}
return -1, fmt.Errorf("error while checking node cluster mode: %w", err)
}
return clusterInfo.CurrentEpoch, nil
}
func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) {
infoStr, err := n.GetClient().ClusterInfo(ctx).Result()
if err != nil {
return nil, err
}
clusterInfo := &ClusterInfo{CurrentEpoch: -1}
lines := strings.Split(infoStr, "\r\n")
for _, line := range lines {
fields := strings.Split(line, ":")
if len(fields) != 2 {
continue
}
fields[1] = strings.TrimSpace(fields[1])
switch strings.ToLower(strings.TrimSpace(fields[0])) {
case "cluster_current_epoch":
clusterInfo.CurrentEpoch, err = strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, err
}
case "migrating_slot", "migrating_slot(s)":
// TODO(@git-hulk): handle multiple migrating slots
clusterInfo.MigratingSlot, err = ParseSlotRange(fields[1])
if err != nil {
return nil, err
}
case "migrating_state":
clusterInfo.MigratingState = fields[1]
}
}
return clusterInfo, nil
}
func (n *ClusterNode) GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error) {
infoStr, err := n.GetClient().Info(ctx).Result()
if err != nil {
return nil, err
}
clusterNodeInfo := &ClusterNodeInfo{}
lines := strings.Split(infoStr, "\r\n")
for _, line := range lines {
fields := strings.Split(line, ":")
if len(fields) != 2 {
continue
}
switch fields[0] {
case "sequence":
clusterNodeInfo.Sequence, err = strconv.ParseUint(fields[1], 10, 64)
if err != nil {
return nil, err
}
case "role":
clusterNodeInfo.Role = fields[1]
}
}
return clusterNodeInfo, nil
}
func (n *ClusterNode) GetClusterNodesString(ctx context.Context) (string, error) {
clusterNodesStr, err := n.GetClient().ClusterNodes(ctx).Result()
if err != nil {
return "", err
}
return strings.TrimRight(clusterNodesStr, "\n"), nil
}
func (n *ClusterNode) SyncClusterInfo(ctx context.Context, cluster *Cluster) error {
clusterStr, err := cluster.ToSlotString()
if err != nil {
return err
}
redisCli := n.GetClient()
err = redisCli.Do(ctx, "CLUSTERX", "SETNODEID", n.id).Err()
if err != nil {
return err
}
return redisCli.Do(ctx, "CLUSTERX", "SETNODES", clusterStr, cluster.Version.Load()).Err()
}
func (n *ClusterNode) Reset(ctx context.Context) error {
return n.GetClient().ClusterResetHard(ctx).Err()
}
func (n *ClusterNode) MigrateSlot(ctx context.Context, slot SlotRange, targetNodeID string) error {
return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot.String(), targetNodeID).Err()
}
func (n *ClusterNode) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"id": n.id,
"addr": n.addr,
"role": n.role,
"password": n.password,
"created_at": n.createdAt,
})
}
func (n *ClusterNode) UnmarshalJSON(bytes []byte) error {
var data struct {
ID string `json:"id"`
Addr string `json:"addr"`
Role string `json:"role"`
Password string `json:"password"`
CreatedAt int64 `json:"created_at"`
}
if err := json.Unmarshal(bytes, &data); err != nil {
return err
}
n.id = data.ID
n.addr = data.Addr
n.role = data.Role
n.password = data.Password
n.createdAt = data.CreatedAt
return nil
}