sharding/sharding_operation.go (172 lines of code) (raw):

package sharding import ( "context" "fmt" conf "github.com/alibaba/MongoShake/v2/collector/configure" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "strings" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/oplog" "reflect" LOG "github.com/vinllen/log4go" ) const ( ConfigDB = "config" SettingsCol = "settings" ShardCol = "shards" ChunkCol = "chunks" CollectionCol = "collections" HashedShard = "hashed" RangedShard = "ranged" ) // get balancer status from config server func GetBalancerStatusByUrl(csUrl string) (bool, error) { var conn *utils.MongoCommunityConn var err error if conn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); conn == nil || err != nil { return true, err } defer conn.Close() var result bson.M err = conn.Client.Database(ConfigDB).Collection(SettingsCol).FindOne(nil, bson.M{"_id": "balancer"}, nil).Decode(&result) if err != nil && err != mongo.ErrNoDocuments { return true, err } if stopped, ok := result["stopped"].(bool); ok { return !stopped, nil } else { return true, nil } } type ChunkRange struct { // the minimum/maximum of the chunk range of multiple columns shard key has multiple values Mins []interface{} Maxs []interface{} } type ShardCollection struct { Chunks []*ChunkRange // shard key may have multiple columns, for example {a:1, b:1, c:1} Keys []string ShardType string } // {replset: {namespace: []ChunkRange} } type ShardingChunkMap map[string]map[string]*ShardCollection type DBChunkMap map[string]*ShardCollection func GetChunkMapByUrl(csUrl string) (ShardingChunkMap, error) { var conn *utils.MongoCommunityConn var err error if conn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); conn == nil || err != nil { return nil, err } defer conn.Close() chunkMap := make(ShardingChunkMap) type ShardDoc struct { Tag string `bson:"_id"` Host string `bson:"host"` } // map: _id -> replset name shardMap := make(map[string]string) var shardDoc ShardDoc shardCursor, err := conn.Client.Database(ConfigDB).Collection(ShardCol).Find(context.Background(), bson.M{}) if err != nil { return nil, err } for shardCursor.Next(context.Background()) { err = shardCursor.Decode(&shardDoc) if err != nil { LOG.Warn("GetChunkMapByUrl Decode Failed, err[%v]", err) continue } replset := strings.Split(shardDoc.Host, "/")[0] shardMap[shardDoc.Tag] = replset chunkMap[replset] = make(DBChunkMap) } type ChunkDoc struct { Ns string `bson:"ns"` Min *bson.Raw `bson:"min"` Max *bson.Raw `bson:"max"` Shard string `bson:"shard"` } // only sharded collections exist on "config.chunks" var chunkDoc ChunkDoc chunkCursor, err := conn.Client.Database(ConfigDB).Collection(ChunkCol).Find(context.Background(), bson.M{}) if err != nil { return nil, err } for chunkCursor.Next(context.Background()) { err = chunkCursor.Decode(&chunkDoc) if err != nil { LOG.Warn("GetChunkMapByUrl Decode Failed, err[%v]", err) continue } // get all keys and shard type(range or hashed) keys, shardType, err := GetColShardType(conn, chunkDoc.Ns) if err != nil { return nil, err } // the namespace is sharded, chunk map of each shard need to initialize for _, dbChunkMap := range chunkMap { if _, ok := dbChunkMap[chunkDoc.Ns]; !ok { dbChunkMap[chunkDoc.Ns] = &ShardCollection{Keys: keys, ShardType: shardType} } } // validate "min" and "max" in chunk replset := shardMap[chunkDoc.Shard] var minD, maxD bson.D err1 := bson.Unmarshal(*chunkDoc.Min, &minD) err2 := bson.Unmarshal(*chunkDoc.Max, &maxD) if err1 != nil || err2 != nil || len(minD) != len(maxD) { return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc min[%v] max[%v]. err1[%v] err2[%v]", minD, maxD, err1, err2) } shardCol := chunkMap[replset][chunkDoc.Ns] var mins, maxs []interface{} for i, item := range minD { if item.Key != shardCol.Keys[i] { return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc min[%v] keys[%v]", minD, shardCol.Keys) } mins = append(mins, item.Value) } for i, item := range maxD { if item.Key != shardCol.Keys[i] { return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc max[%v] keys[%v]", maxD, shardCol.Keys) } maxs = append(maxs, item.Value) } chunkRange := &ChunkRange{Mins: mins, Maxs: maxs} shardCol.Chunks = append(shardCol.Chunks, chunkRange) } return chunkMap, nil } // input given namespace, return all keys and shard type(range or hashed) func GetColShardType(conn *utils.MongoCommunityConn, namespace string) ([]string, string, error) { var colDoc bson.D if err := conn.Client.Database(ConfigDB).Collection(CollectionCol).FindOne(context.Background(), bson.M{"_id": namespace}).Decode(&colDoc); err != nil { return nil, "", err } var keys []string var shardType string var ok bool if colDoc, ok = oplog.GetKey(colDoc, "key").(bson.D); !ok { return nil, "", fmt.Errorf("GetColShardType with namespace[%v] has no key item in doc %v", namespace, colDoc) } for _, item := range colDoc { fmt.Println(item) // either be a single hashed field, or a list of ascending fields switch v := item.Value.(type) { case string: shardType = HashedShard case int: shardType = RangedShard case float64: shardType = RangedShard default: return nil, "", fmt.Errorf("GetColShardType with namespace[%v] doc[%v] meet unknown ShakeKey type[%v]", namespace, colDoc, reflect.TypeOf(v)) } keys = append(keys, item.Key) } return keys, shardType, nil } type ShardCollectionSpec struct { Ns string Key bson.D Unique bool }