admin-cli/executor/toolkits/nodesbalancer/migrator.go (201 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package nodesbalancer
import (
"fmt"
"math"
"time"
migrator "github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/executor"
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/diskbalancer"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/idl/base"
)
type NodesCapacity struct {
Node *util.PegasusNode `json:"node"`
Disks []executor.DiskCapacityStruct
Total int64 `json:"total"`
Usage int64 `json:"usage"`
Available int64 `json:"available"`
}
type NodesReplica struct {
Node *util.PegasusNode
Replicas []*executor.ReplicaCapacityStruct
}
type Migrator struct {
CapacityLoad []NodesCapacity
Total int64
Usage int64
Average int64
}
func (m *Migrator) reset() {
m.Total = 0
m.Average = 0
m.Usage = 0
}
func (m *Migrator) updateNodesLoad(client *executor.Client) error {
nodes, err := client.Meta.ListNodes()
if err != nil {
return err
}
var nodesLoad []interface{}
for _, node := range nodes {
pegasusNode := client.Nodes.MustGetReplica(node.Address.GetAddress())
disksLoad, totalUsage, totalCapacity, err := diskbalancer.QueryDiskCapacityInfo(client, pegasusNode.TCPAddr())
if err != nil {
return err
}
diskCapacity := NodesCapacity{
Node: pegasusNode,
Disks: disksLoad,
Total: totalCapacity,
Usage: totalUsage,
Available: totalCapacity - totalUsage,
}
nodesLoad = append(nodesLoad, diskCapacity)
}
if nodesLoad == nil {
return err
}
m.reset()
util.SortStructsByField(nodesLoad, "Usage")
for _, node := range nodesLoad {
m.CapacityLoad = append(m.CapacityLoad, node.(NodesCapacity))
m.Total += node.(NodesCapacity).Total
m.Usage += node.(NodesCapacity).Usage
}
m.Average = m.Usage / int64(len(nodesLoad))
return nil
}
type partition struct {
Gpid *base.Gpid
Status migrator.BalanceType
Size int64
}
type ActionProposal struct {
replica *partition
from *NodesCapacity
to *NodesCapacity
}
func (act *ActionProposal) toString() string {
return fmt.Sprintf("[%s]%s:%s=>%s", act.replica.Status.String(), act.replica.Gpid.String(),
act.from.Node.String(), act.to.Node.String())
}
func (m *Migrator) selectNextAction(client *executor.Client) (*ActionProposal, error) {
highNode := m.CapacityLoad[len(m.CapacityLoad)-1]
lowNode := m.CapacityLoad[0]
highDiskOfHighNode := highNode.Disks[len(highNode.Disks)-1]
toolkits.LogInfo(fmt.Sprintf("expect_average = %dGB, high node = %s[%s][usage=%dGB], low node = %s[usage=%dGB]\n",
m.Average/1024, highNode.Node.String(), highDiskOfHighNode.Disk, highNode.Usage/1024, lowNode.Node.String(), lowNode.Usage/1024))
lowUsageRatio := lowNode.Usage * 100 / lowNode.Total
highUsageRatio := highNode.Usage * 100 / highNode.Total
if highUsageRatio-lowUsageRatio <= 5 {
return nil, fmt.Errorf("high node and low node has little diff: %d vs %d", highUsageRatio, lowUsageRatio)
}
sizeAllowMoved := math.Min(float64(highNode.Usage-m.Average), float64(m.Average-lowNode.Usage))
highDiskReplicasOfHighNode, err := getDiskReplicas(client, &highNode, highDiskOfHighNode.Disk)
if err != nil {
return nil, fmt.Errorf("get high node[%s] high disk[%s] replicas err: %s", highNode.Node.String(), highDiskOfHighNode.Disk, err.Error())
}
totalReplicasOfLowNode, err := getNodeReplicas(client, &lowNode)
if err != nil {
return nil, fmt.Errorf("get low node[%s] replicas err: %s", lowNode.Node.String(), err.Error())
}
var selectReplica executor.ReplicaCapacityStruct
for _, replica := range highDiskReplicasOfHighNode {
if replica.Size > int64(sizeAllowMoved) {
toolkits.LogDebug(fmt.Sprintf("select next replica for the replica is too large(replica_size > allow_size): %d > %f", replica.Size, sizeAllowMoved))
continue
}
if totalReplicasOfLowNode.contain(replica.Gpid) {
toolkits.LogDebug(fmt.Sprintf("select next replica for the replica(%s) is has existed target node(%s)", replica.Gpid, lowNode.Node.String()))
continue
}
selectReplica = replica
}
if selectReplica.Gpid == "" {
return nil, fmt.Errorf("can't find valid replica to balance")
}
gpid, err := util.Str2Gpid(selectReplica.Gpid)
if err != nil {
return nil, err
}
status := migrator.BalanceCopySec
if selectReplica.Status == "primary" {
status = migrator.BalanceCopyPri
}
return &ActionProposal{
replica: &partition{
Gpid: gpid,
Status: status,
},
from: &highNode,
to: &lowNode,
}, err
}
type replicas []executor.ReplicaCapacityStruct
func (r replicas) contain(selectReplica string) bool {
for _, replica := range r {
if replica.Gpid == selectReplica {
return true
}
}
return false
}
func getDiskReplicas(client *executor.Client, replicaServer *NodesCapacity, diskTag string) (replicas, error) {
node := replicaServer.Node.TCPAddr()
diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, node, "", diskTag, false)
if err != nil {
return nil, err
}
replicas, err := executor.ConvertReplicaCapacityStruct(diskInfo)
if err != nil {
return nil, err
}
return replicas, nil
}
func getNodeReplicas(client *executor.Client, replicaServer *NodesCapacity) (replicas, error) {
node := replicaServer.Node.TCPAddr()
var totalDiskInfo []interface{}
for _, disk := range replicaServer.Disks {
tag := disk.Disk
diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, node, "", tag, false)
if err != nil {
return nil, err
}
totalDiskInfo = append(totalDiskInfo, diskInfo...)
}
replicas, err := executor.ConvertReplicaCapacityStruct(totalDiskInfo)
if err != nil {
return nil, err
}
return replicas, nil
}
func waitCompleted(client *executor.Client, action *ActionProposal) error {
for {
replicas, err := getNodeReplicas(client, action.to)
if err != nil {
toolkits.LogInfo(err.Error())
time.Sleep(time.Second * 10)
continue
}
if !replicas.contain(fmt.Sprintf("%d.%d", action.replica.Gpid.Appid,
action.replica.Gpid.PartitionIndex)) {
toolkits.LogInfo(fmt.Sprintf("%s is running", action.toString()))
time.Sleep(time.Second * 10)
continue
}
break
}
toolkits.LogInfo(fmt.Sprintf("%s is completed and wait 100s to wait gc garbage", action.toString()))
// set meta level as lively to clean garbage
err := executor.SetMetaLevel(client, "lively")
if err != nil {
return err
}
// recover meta level as steady to next action
time.Sleep(time.Second * 100)
err = executor.SetMetaLevel(client, "steady")
if err != nil {
return err
}
fmt.Println()
return nil
}