in collector/coordinator/utils.go [23:115]
func (coordinator *ReplicationCoordinator) compareCheckpointAndDbTs(syncModeAll bool) (int64, map[string]int64, bool, error) {
var (
tsMap map[string]utils.TimestampNode
startTsMap map[string]int64 // replica-set name => timestamp
smallestNew int64
err error
)
switch testSelectSyncMode {
case true:
// only used for unit test
tsMap, _, smallestNew, _, _, err = utils.GetAllTimestampInUT()
case false:
// smallestNew is the smallest of the all newest timestamp
tsMap, _, smallestNew, _, _, err = utils.GetAllTimestamp(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
if err != nil {
return 0, nil, false, fmt.Errorf("get all timestamp failed: %v", err)
}
}
startTsMap = make(map[string]int64, len(tsMap)+1)
confTs32 := conf.Options.CheckpointStartPosition
confTsMongoTs := confTs32 << 32
LOG.Info("all node timestamp map: %v CheckpointStartPosition:%v", tsMap, utils.Int64ToTimestamp(confTsMongoTs))
// fetch mongos checkpoint when using change stream
var mongosCkpt *ckpt.CheckpointContext
if coordinator.MongoS != nil &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
LOG.Info("try to fetch mongos checkpoint")
ckptManager := ckpt.NewCheckpointManager(coordinator.MongoS.ReplicaName, 0)
ckptVar, exist, err := ckptManager.Get()
if err != nil {
return 0, nil, false, fmt.Errorf("get mongos[%v] checkpoint failed: %v",
coordinator.MongoS.ReplicaName, err)
} else if ckptVar == nil {
return 0, nil, false, fmt.Errorf("get mongos[%v] checkpoint empty", coordinator.MongoS.ReplicaName)
} else if !exist || ckptVar.Timestamp <= 1 { // empty
mongosCkpt = nil
// mongosCkpt = ckptVar // still use checkpoint
startTsMap[coordinator.MongoS.ReplicaName] = int64(confTsMongoTs) // use configuration
} else {
mongosCkpt = ckptVar
startTsMap[coordinator.MongoS.ReplicaName] = int64(ckptVar.Timestamp) // use old checkpoint
}
}
for replName, ts := range tsMap {
var ckptRemote *ckpt.CheckpointContext
if mongosCkpt == nil {
ckptManager := ckpt.NewCheckpointManager(replName, 0)
ckptVar, exist, err := ckptManager.Get()
if err != nil {
return 0, nil, false, fmt.Errorf("get mongod[%v] checkpoint failed: %v", replName, err)
} else if !exist || ckptVar.Timestamp <= 1 { // empty
// set nil to make code more clear
ckptRemote = nil
} else {
ckptRemote = ckptVar
}
LOG.Info("%s checkpoint using mongod/replica_set: %s, ckptRemote set? [%v]", replName,
ckptVar, ckptRemote != nil)
} else {
ckptRemote = mongosCkpt
LOG.Info("%s checkpoint using mongos: %s", replName, mongosCkpt)
}
if ckptRemote == nil {
if syncModeAll || confTsMongoTs > (1<<32) && ts.Oldest >= confTsMongoTs {
LOG.Info("%s syncModeAll[%v] ts.Oldest[%v], confTsMongoTs[%v]", replName, syncModeAll, ts.Oldest,
confTsMongoTs)
return smallestNew, nil, false, nil
}
startTsMap[replName] = int64(confTsMongoTs)
} else {
// checkpoint less than the oldest timestamp, ckpt.OplogDiskQueue == "" means not enable
// disk persist
if ts.Oldest >= ckptRemote.Timestamp && ckptRemote.OplogDiskQueue == "" {
LOG.Info("%s ts.Oldest[%v] >= ckptRemote.Timestamp[%v], need full sync", replName,
ts.Oldest, ckptRemote.Timestamp)
// can't run incr sync directly
return smallestNew, nil, false, nil
}
startTsMap[replName] = int64(ckptRemote.Timestamp)
}
}
return smallestNew, startTsMap, true, nil
}