admin-cli/executor/toolkits/diskbalancer/migrator.go (212 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package diskbalancer import ( "fmt" "math" adminClient "github.com/apache/incubator-pegasus/admin-cli/client" "github.com/apache/incubator-pegasus/admin-cli/executor" "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits" "github.com/apache/incubator-pegasus/admin-cli/util" "github.com/apache/incubator-pegasus/go-client/idl/admin" "github.com/apache/incubator-pegasus/go-client/session" ) type DiskStats struct { diskCapacity executor.DiskCapacityStruct replicaCapacity []executor.ReplicaCapacityStruct } type MigrateDisk struct { averageUsage int64 currentNode string highDisk DiskStats lowDisk DiskStats } func (m *MigrateDisk) toString() string { return fmt.Sprintf("Node=%s, highDisk=%s[%dMB(%d%%)]=>lowDisk=%s[%dMB(%d%%)]", m.currentNode, m.highDisk.diskCapacity.Disk, m.highDisk.diskCapacity.Usage, m.highDisk.diskCapacity.Ratio, m.lowDisk.diskCapacity.Disk, m.lowDisk.diskCapacity.Usage, m.lowDisk.diskCapacity.Ratio) } type MigrateAction struct { node string replica executor.ReplicaCapacityStruct from string to string } func (m *MigrateAction) toString() string { return fmt.Sprintf("node=%s, replica=%s, %s=>%s", m.node, m.replica.Gpid, m.from, m.to) } func changeDiskCleanerInterval(client *executor.Client, replicaServer string, cleanInterval string) error { toolkits.LogInfo(fmt.Sprintf("set gc_disk_migration_origin_replica_interval_seconds = %ss ", cleanInterval)) err := executor.ConfigCommand(client, session.NodeTypeReplica, replicaServer, "gc_disk_migration_origin_replica_interval_seconds", "set", cleanInterval) return err } func getNextMigrateAction(client *executor.Client, replicaServer string, minSize int64) (*MigrateAction, error) { disks, totalUsage, totalCapacity, err := QueryDiskCapacityInfo(client, replicaServer) if err != nil { return nil, err } diskMigrateInfo, err := getMigrateDiskInfo(client, replicaServer, disks, totalUsage, totalCapacity) if err != nil { return nil, err } migrateAction, err := computeMigrateAction(diskMigrateInfo, minSize) if err != nil { return nil, err } return migrateAction, nil } func QueryDiskCapacityInfo(client *executor.Client, replicaServer string) ([]executor.DiskCapacityStruct, int64, int64, error) { diskCapacityOnNode, err := executor.GetDiskInfo(client, executor.CapacitySize, replicaServer, "", "", false) if err != nil { return nil, 0, 0, err } util.SortStructsByField(diskCapacityOnNode, "Usage") var disks []executor.DiskCapacityStruct var totalUsage int64 var totalCapacity int64 for _, disk := range diskCapacityOnNode { if s, ok := disk.(executor.DiskCapacityStruct); ok { disks = append(disks, s) totalUsage += s.Usage totalCapacity += s.Capacity } else { return nil, 0, 0, fmt.Errorf("can't covert to DiskCapacityStruct") } } if disks == nil { return nil, 0, 0, fmt.Errorf("the node(%s) has no ssd", replicaServer) } if len(disks) == 1 { return nil, 0, 0, fmt.Errorf("the node(%s) only has one disk, can't balance", replicaServer) } return disks, totalUsage, totalCapacity, nil } func getMigrateDiskInfo(client *executor.Client, replicaServer string, disks []executor.DiskCapacityStruct, totalUsage int64, totalCapacity int64) (*MigrateDisk, error) { highUsageDisk := disks[len(disks)-1] highDiskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, replicaServer, "", highUsageDisk.Disk, false) if err != nil { return nil, err } lowUsageDisk := disks[0] lowDiskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, replicaServer, "", lowUsageDisk.Disk, false) if err != nil { return nil, err } if highUsageDisk.Ratio < 10 { return nil, fmt.Errorf("no need balance since the high disk still enough capacity(balance threshold=10%%): "+ "high(%s): %dMB(%d%%); low(%s): %dMB(%d%%)", highUsageDisk.Disk, highUsageDisk.Usage, highUsageDisk.Ratio, lowUsageDisk.Disk, lowUsageDisk.Usage, lowUsageDisk.Ratio) } averageUsage := totalUsage / int64(len(disks)) averageRatio := totalUsage * 100 / totalCapacity if highUsageDisk.Ratio-lowUsageDisk.Ratio < 5 { return nil, fmt.Errorf("no need balance since the disk is balanced:"+ " high(%s): %dMB(%d%%); low(%s): %dMB(%d%%); average: %dMB(%d%%)", highUsageDisk.Disk, highUsageDisk.Usage, highUsageDisk.Ratio, lowUsageDisk.Disk, lowUsageDisk.Usage, lowUsageDisk.Ratio, averageUsage, averageRatio) } replicaCapacityOnHighDisk, err := executor.ConvertReplicaCapacityStruct(highDiskInfo) if err != nil { return nil, fmt.Errorf("parse replica info on high disk(%s) failed: %s", highUsageDisk.Disk, err.Error()) } replicaCapacityOnLowDisk, err := executor.ConvertReplicaCapacityStruct(lowDiskInfo) if err != nil { return nil, fmt.Errorf("parse replica info on low disk(%s) failed: %s", highUsageDisk.Disk, err.Error()) } return &MigrateDisk{ averageUsage: averageUsage, currentNode: replicaServer, highDisk: DiskStats{ diskCapacity: highUsageDisk, replicaCapacity: replicaCapacityOnHighDisk, }, lowDisk: DiskStats{ diskCapacity: lowUsageDisk, replicaCapacity: replicaCapacityOnLowDisk, }, }, nil } func computeMigrateAction(migrate *MigrateDisk, minSize int64) (*MigrateAction, error) { lowDiskCanReceiveMax := migrate.averageUsage - migrate.lowDisk.diskCapacity.Usage highDiskCanSendMax := migrate.highDisk.diskCapacity.Usage - migrate.averageUsage sizeNeedMove := int64(math.Min(float64(lowDiskCanReceiveMax), float64(highDiskCanSendMax))) var selectReplica *executor.ReplicaCapacityStruct for i := len(migrate.highDisk.replicaCapacity) - 1; i >= 0; i-- { if migrate.highDisk.replicaCapacity[i].Size > sizeNeedMove { continue } else { selectReplica = &migrate.highDisk.replicaCapacity[i] break } } if selectReplica == nil { return nil, fmt.Errorf("can't balance(%s): high disk min replica(%s) size(%dMB) must <= sizeNeedMove(%dMB)", migrate.toString(), migrate.highDisk.replicaCapacity[0].Gpid, migrate.highDisk.replicaCapacity[0].Size, sizeNeedMove) } // if select replica size is too small, it will need migrate many replica and result in `replica count not balance` among disk if selectReplica.Size < minSize { return nil, fmt.Errorf("not suggest balance(%s): the qualified(must<=sizeNeedMove(%dMB)) replica size(%s=%dMB) must >= minSize(%dMB))", migrate.toString(), sizeNeedMove, selectReplica.Gpid, selectReplica.Size, minSize) } fmt.Printf("ACTION:disk migrate(sizeNeedMove=%dMB): %s, gpid(%s)=%s(%dMB)\n", sizeNeedMove, migrate.toString(), selectReplica.Status, selectReplica.Gpid, selectReplica.Size) return &MigrateAction{ node: migrate.currentNode, replica: *selectReplica, from: migrate.highDisk.diskCapacity.Disk, to: migrate.lowDisk.diskCapacity.Disk, }, nil } func forceAssignReplicaToSecondary(client *executor.Client, replicaServer string, gpid string) error { fmt.Printf("WARNING: the select replica is not secondary, will force assign it secondary\n") if _, err := client.Meta.MetaControl(admin.MetaFunctionLevel_fl_steady); err != nil { return err } secondaryNode, err := getReplicaSecondaryNode(client, gpid) if err != nil { return err } replica, err := util.Str2Gpid(gpid) if err != nil { return err } return client.Meta.Balance(replica, adminClient.BalanceMovePri, util.NewNodeFromTCPAddr(replicaServer, session.NodeTypeReplica), secondaryNode) } func getReplicaSecondaryNode(client *executor.Client, gpid string) (*util.PegasusNode, error) { replica, err := util.Str2Gpid(gpid) if err != nil { return nil, err } tables, err := client.Meta.ListApps(admin.AppStatus_AS_AVAILABLE) if err != nil { return nil, fmt.Errorf("can't get the table name of replica %s when migrate the replica", gpid) } var tableName string for _, tb := range tables { if tb.AppID == replica.Appid { tableName = tb.AppName break } } if tableName == "" { return nil, fmt.Errorf("can't find the table for %s when migrate the replica", gpid) } resp, err := client.Meta.QueryConfig(tableName) if err != nil { return nil, fmt.Errorf("can't get the table %s configuration when migrate the replica(%s): %s", tableName, gpid, err) } var secondaryNode *util.PegasusNode for _, partition := range resp.Partitions { if partition.Pid.String() == replica.String() { secondaryNode = util.NewNodeFromTCPAddr(partition.Secondaries[0].GetAddress(), session.NodeTypeReplica) } } if secondaryNode == nil { return nil, fmt.Errorf("can't get the replica %s secondary node", gpid) } return secondaryNode, nil }