in sharding/sharding_operation.go [73:169]
func GetChunkMapByUrl(csUrl string) (ShardingChunkMap, error) {
var conn *utils.MongoCommunityConn
var err error
if conn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); conn == nil || err != nil {
return nil, err
}
defer conn.Close()
chunkMap := make(ShardingChunkMap)
type ShardDoc struct {
Tag string `bson:"_id"`
Host string `bson:"host"`
}
// map: _id -> replset name
shardMap := make(map[string]string)
var shardDoc ShardDoc
shardCursor, err := conn.Client.Database(ConfigDB).Collection(ShardCol).Find(context.Background(), bson.M{})
if err != nil {
return nil, err
}
for shardCursor.Next(context.Background()) {
err = shardCursor.Decode(&shardDoc)
if err != nil {
LOG.Warn("GetChunkMapByUrl Decode Failed, err[%v]", err)
continue
}
replset := strings.Split(shardDoc.Host, "/")[0]
shardMap[shardDoc.Tag] = replset
chunkMap[replset] = make(DBChunkMap)
}
type ChunkDoc struct {
Ns string `bson:"ns"`
Min *bson.Raw `bson:"min"`
Max *bson.Raw `bson:"max"`
Shard string `bson:"shard"`
}
// only sharded collections exist on "config.chunks"
var chunkDoc ChunkDoc
chunkCursor, err := conn.Client.Database(ConfigDB).Collection(ChunkCol).Find(context.Background(), bson.M{})
if err != nil {
return nil, err
}
for chunkCursor.Next(context.Background()) {
err = chunkCursor.Decode(&chunkDoc)
if err != nil {
LOG.Warn("GetChunkMapByUrl Decode Failed, err[%v]", err)
continue
}
// get all keys and shard type(range or hashed)
keys, shardType, err := GetColShardType(conn, chunkDoc.Ns)
if err != nil {
return nil, err
}
// the namespace is sharded, chunk map of each shard need to initialize
for _, dbChunkMap := range chunkMap {
if _, ok := dbChunkMap[chunkDoc.Ns]; !ok {
dbChunkMap[chunkDoc.Ns] = &ShardCollection{Keys: keys, ShardType: shardType}
}
}
// validate "min" and "max" in chunk
replset := shardMap[chunkDoc.Shard]
var minD, maxD bson.D
err1 := bson.Unmarshal(*chunkDoc.Min, &minD)
err2 := bson.Unmarshal(*chunkDoc.Max, &maxD)
if err1 != nil || err2 != nil || len(minD) != len(maxD) {
return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc min[%v] max[%v]. err1[%v] err2[%v]",
minD, maxD, err1, err2)
}
shardCol := chunkMap[replset][chunkDoc.Ns]
var mins, maxs []interface{}
for i, item := range minD {
if item.Key != shardCol.Keys[i] {
return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc min[%v] keys[%v]",
minD, shardCol.Keys)
}
mins = append(mins, item.Value)
}
for i, item := range maxD {
if item.Key != shardCol.Keys[i] {
return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc max[%v] keys[%v]",
maxD, shardCol.Keys)
}
maxs = append(maxs, item.Value)
}
chunkRange := &ChunkRange{Mins: mins, Maxs: maxs}
shardCol.Chunks = append(shardCol.Chunks, chunkRange)
}
return chunkMap, nil
}