func GetChunkMapByUrl()

in sharding/sharding_operation.go [73:169]


func GetChunkMapByUrl(csUrl string) (ShardingChunkMap, error) {
	var conn *utils.MongoCommunityConn
	var err error
	if conn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true,
		utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); conn == nil || err != nil {
		return nil, err
	}
	defer conn.Close()

	chunkMap := make(ShardingChunkMap)
	type ShardDoc struct {
		Tag  string `bson:"_id"`
		Host string `bson:"host"`
	}
	// map: _id -> replset name
	shardMap := make(map[string]string)
	var shardDoc ShardDoc

	shardCursor, err := conn.Client.Database(ConfigDB).Collection(ShardCol).Find(context.Background(), bson.M{})
	if err != nil {
		return nil, err
	}
	for shardCursor.Next(context.Background()) {
		err = shardCursor.Decode(&shardDoc)
		if err != nil {
			LOG.Warn("GetChunkMapByUrl Decode Failed, err[%v]", err)
			continue
		}

		replset := strings.Split(shardDoc.Host, "/")[0]
		shardMap[shardDoc.Tag] = replset
		chunkMap[replset] = make(DBChunkMap)
	}

	type ChunkDoc struct {
		Ns    string    `bson:"ns"`
		Min   *bson.Raw `bson:"min"`
		Max   *bson.Raw `bson:"max"`
		Shard string    `bson:"shard"`
	}
	// only sharded collections exist on "config.chunks"
	var chunkDoc ChunkDoc
	chunkCursor, err := conn.Client.Database(ConfigDB).Collection(ChunkCol).Find(context.Background(), bson.M{})
	if err != nil {
		return nil, err
	}
	for chunkCursor.Next(context.Background()) {
		err = chunkCursor.Decode(&chunkDoc)
		if err != nil {
			LOG.Warn("GetChunkMapByUrl Decode Failed, err[%v]", err)
			continue
		}

		// get all keys and shard type(range or hashed)
		keys, shardType, err := GetColShardType(conn, chunkDoc.Ns)
		if err != nil {
			return nil, err
		}

		// the namespace is sharded, chunk map of each shard need to initialize
		for _, dbChunkMap := range chunkMap {
			if _, ok := dbChunkMap[chunkDoc.Ns]; !ok {
				dbChunkMap[chunkDoc.Ns] = &ShardCollection{Keys: keys, ShardType: shardType}
			}
		}

		// validate "min" and "max" in chunk
		replset := shardMap[chunkDoc.Shard]
		var minD, maxD bson.D
		err1 := bson.Unmarshal(*chunkDoc.Min, &minD)
		err2 := bson.Unmarshal(*chunkDoc.Max, &maxD)
		if err1 != nil || err2 != nil || len(minD) != len(maxD) {
			return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc min[%v] max[%v]. err1[%v] err2[%v]",
				minD, maxD, err1, err2)
		}

		shardCol := chunkMap[replset][chunkDoc.Ns]
		var mins, maxs []interface{}
		for i, item := range minD {
			if item.Key != shardCol.Keys[i] {
				return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc min[%v] keys[%v]",
					minD, shardCol.Keys)
			}
			mins = append(mins, item.Value)
		}
		for i, item := range maxD {
			if item.Key != shardCol.Keys[i] {
				return nil, fmt.Errorf("GetChunkMapByUrl get illegal chunk doc max[%v] keys[%v]",
					maxD, shardCol.Keys)
			}
			maxs = append(maxs, item.Value)
		}
		chunkRange := &ChunkRange{Mins: mins, Maxs: maxs}
		shardCol.Chunks = append(shardCol.Chunks, chunkRange)
	}
	return chunkMap, nil
}