collector/coordinator/utils.go (178 lines of code) (raw):

package coordinator import ( "fmt" "sync" "github.com/alibaba/MongoShake/v2/collector/ckpt" conf "github.com/alibaba/MongoShake/v2/collector/configure" sourceReader "github.com/alibaba/MongoShake/v2/collector/reader" utils "github.com/alibaba/MongoShake/v2/common" LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" ) /* * compare current checkpoint and database timestamp * @return: * int64: the smallest newest timestamp of all mongod * bool: can run incremental sync directly? * error: error */ func (coordinator *ReplicationCoordinator) compareCheckpointAndDbTs(syncModeAll bool) (int64, map[string]int64, bool, error) { var ( tsMap map[string]utils.TimestampNode startTsMap map[string]int64 // replica-set name => timestamp smallestNew int64 err error ) switch testSelectSyncMode { case true: // only used for unit test tsMap, _, smallestNew, _, _, err = utils.GetAllTimestampInUT() case false: // smallestNew is the smallest of the all newest timestamp tsMap, _, smallestNew, _, _, err = utils.GetAllTimestamp(coordinator.MongoD, conf.Options.MongoSslRootCaFile) if err != nil { return 0, nil, false, fmt.Errorf("get all timestamp failed: %v", err) } } startTsMap = make(map[string]int64, len(tsMap)+1) confTs32 := conf.Options.CheckpointStartPosition confTsMongoTs := confTs32 << 32 LOG.Info("all node timestamp map: %v CheckpointStartPosition:%v", tsMap, utils.Int64ToTimestamp(confTsMongoTs)) // fetch mongos checkpoint when using change stream var mongosCkpt *ckpt.CheckpointContext if coordinator.MongoS != nil && conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream { LOG.Info("try to fetch mongos checkpoint") ckptManager := ckpt.NewCheckpointManager(coordinator.MongoS.ReplicaName, 0) ckptVar, exist, err := ckptManager.Get() if err != nil { return 0, nil, false, fmt.Errorf("get mongos[%v] checkpoint failed: %v", coordinator.MongoS.ReplicaName, err) } else if ckptVar == nil { return 0, nil, false, fmt.Errorf("get mongos[%v] checkpoint empty", coordinator.MongoS.ReplicaName) } else if !exist || ckptVar.Timestamp <= 1 { // empty mongosCkpt = nil // mongosCkpt = ckptVar // still use checkpoint startTsMap[coordinator.MongoS.ReplicaName] = int64(confTsMongoTs) // use configuration } else { mongosCkpt = ckptVar startTsMap[coordinator.MongoS.ReplicaName] = int64(ckptVar.Timestamp) // use old checkpoint } } for replName, ts := range tsMap { var ckptRemote *ckpt.CheckpointContext if mongosCkpt == nil { ckptManager := ckpt.NewCheckpointManager(replName, 0) ckptVar, exist, err := ckptManager.Get() if err != nil { return 0, nil, false, fmt.Errorf("get mongod[%v] checkpoint failed: %v", replName, err) } else if !exist || ckptVar.Timestamp <= 1 { // empty // set nil to make code more clear ckptRemote = nil } else { ckptRemote = ckptVar } LOG.Info("%s checkpoint using mongod/replica_set: %s, ckptRemote set? [%v]", replName, ckptVar, ckptRemote != nil) } else { ckptRemote = mongosCkpt LOG.Info("%s checkpoint using mongos: %s", replName, mongosCkpt) } if ckptRemote == nil { if syncModeAll || confTsMongoTs > (1<<32) && ts.Oldest >= confTsMongoTs { LOG.Info("%s syncModeAll[%v] ts.Oldest[%v], confTsMongoTs[%v]", replName, syncModeAll, ts.Oldest, confTsMongoTs) return smallestNew, nil, false, nil } startTsMap[replName] = int64(confTsMongoTs) } else { // checkpoint less than the oldest timestamp, ckpt.OplogDiskQueue == "" means not enable // disk persist if ts.Oldest >= ckptRemote.Timestamp && ckptRemote.OplogDiskQueue == "" { LOG.Info("%s ts.Oldest[%v] >= ckptRemote.Timestamp[%v], need full sync", replName, ts.Oldest, ckptRemote.Timestamp) // can't run incr sync directly return smallestNew, nil, false, nil } startTsMap[replName] = int64(ckptRemote.Timestamp) } } return smallestNew, startTsMap, true, nil } func (coordinator *ReplicationCoordinator) isCheckpointExist() (bool, interface{}, error) { ckptManager := ckpt.NewCheckpointManager(coordinator.RealSourceFullSync[0].ReplicaName, 0) ckptVar, exist, err := ckptManager.Get() LOG.Info("isCheckpointExist? %v %v %v", ckptVar, exist, err) if err != nil { return false, 0, fmt.Errorf("get mongod[%v] checkpoint failed: %v", coordinator.RealSourceFullSync[0].ReplicaName, err) } else if !exist { // send changestream reader, err := sourceReader.CreateReader(utils.VarIncrSyncMongoFetchMethodChangeStream, coordinator.RealSourceFullSync[0].URL, coordinator.RealSourceFullSync[0].ReplicaName) if err != nil { return false, 0, fmt.Errorf("create reader failed: %v", err) } resumeToken, err := reader.FetchNewestTimestamp() if err != nil { return false, 0, fmt.Errorf("fetch PBRT fail: %v", err) } LOG.Info("isCheckpointExist change stream resumeToken: %v", resumeToken) return false, resumeToken, nil } return true, ckptVar.Timestamp, nil } // if the oplog of checkpoint timestamp exist in all source db, then only do oplog replication instead of document replication func (coordinator *ReplicationCoordinator) selectSyncMode(syncMode string) (string, map[string]int64, interface{}, error) { if syncMode != utils.VarSyncModeAll && syncMode != utils.VarSyncModeIncr { return syncMode, nil, int64(0), nil } // special case, I hate it. // TODO, checkpoint support ResumeToken if conf.Options.SpecialSourceDBFlag == utils.VarSpecialSourceDBFlagAliyunServerless || (len(conf.Options.MongoSUrl) > 0 && len(conf.Options.MongoCsUrl) == 0 && len(conf.Options.MongoUrls) == 0) { // for only mongo_s_url address exists if syncMode == utils.VarSyncModeIncr { _, startTsMaptmp, _, _ := coordinator.compareCheckpointAndDbTs(syncMode == utils.VarSyncModeAll) LOG.Info("for only mongo_s_url address exists startTsMap[%v]", startTsMaptmp) return syncMode, startTsMaptmp, int64(0), nil } ok, token, err := coordinator.isCheckpointExist() if err != nil { return "", nil, int64(0), fmt.Errorf("check isCheckpointExist failed: %v", err) } if !ok { return utils.VarSyncModeAll, nil, token, nil } startTsMap := map[string]int64{ coordinator.RealSourceIncrSync[0].ReplicaName: token.(int64), } return utils.VarSyncModeIncr, startTsMap, token, nil } smallestNewTs, startTsMap, canIncrSync, err := coordinator.compareCheckpointAndDbTs(syncMode == utils.VarSyncModeAll) if err != nil { return "", nil, int64(0), err } if canIncrSync { LOG.Info("sync mode run %v", utils.VarSyncModeIncr) return utils.VarSyncModeIncr, startTsMap, int64(0), nil } else if syncMode == utils.VarSyncModeIncr || conf.Options.Tunnel != utils.VarTunnelDirect { // bugfix v2.4.11: if can not run incr sync directly, return error when sync_mode == "incr" // bugfix v2.4.12: return error when tunnel != "direct" return "", nil, int64(0), fmt.Errorf("start time illegal, can't run incr sync") } else { return utils.VarSyncModeAll, nil, smallestNewTs, nil } } /* * fetch all indexes. * the cost is low so that no need to run in parallel. */ func fetchIndexes(sourceList []*utils.MongoSource, filterFunc func(name string) bool) (map[utils.NS][]bson.D, error) { var mutex sync.Mutex indexMap := make(map[utils.NS][]bson.D) for _, src := range sourceList { LOG.Info("source[%v %v] start fetching index", src.ReplicaName, utils.BlockMongoUrlPassword(src.URL, "***")) // 1. fetch namespace nsList, _, err := utils.GetDbNamespace(src.URL, filterFunc, conf.Options.MongoSslRootCaFile) if err != nil { return nil, fmt.Errorf("source[%v %v] get namespace failed: %v", src.ReplicaName, src.URL, err) } LOG.Info("index namespace list: %v", nsList) // 2. build connection conn, err := utils.NewMongoCommunityConn(src.URL, utils.VarMongoConnectModeSecondaryPreferred, true, utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile) if err != nil { return nil, fmt.Errorf("source[%v %v] build connection failed: %v", src.ReplicaName, src.URL, err) } defer conn.Close() // it's acceptable to call defer here // 3. fetch all indexes for _, ns := range nsList { // indexes, err := conn.Session.DB(ns.Database).C(ns.Collection).Indexes() cursor, err := conn.Client.Database(ns.Database).Collection(ns.Collection).Indexes().List(nil) if err != nil { return nil, fmt.Errorf("source[%v %v] fetch index failed: %v", src.ReplicaName, src.URL, err) } indexes := make([]bson.D, 0) if err = cursor.All(nil, &indexes); err != nil { return nil, fmt.Errorf("index cursor fetch all indexes fail: %v", err) } mutex.Lock() indexMap[ns] = indexes mutex.Unlock() } LOG.Info("source[%v %v] finish fetching index", src.ReplicaName, utils.BlockMongoUrlPassword(src.URL, "***")) } return indexMap, nil }