in collector/coordinator/utils.go [144:192]
func (coordinator *ReplicationCoordinator) selectSyncMode(syncMode string) (string, map[string]int64,
interface{}, error) {
if syncMode != utils.VarSyncModeAll && syncMode != utils.VarSyncModeIncr {
return syncMode, nil, int64(0), nil
}
// special case, I hate it.
// TODO, checkpoint support ResumeToken
if conf.Options.SpecialSourceDBFlag == utils.VarSpecialSourceDBFlagAliyunServerless ||
(len(conf.Options.MongoSUrl) > 0 && len(conf.Options.MongoCsUrl) == 0 && len(conf.Options.MongoUrls) == 0) {
// for only mongo_s_url address exists
if syncMode == utils.VarSyncModeIncr {
_, startTsMaptmp, _, _ := coordinator.compareCheckpointAndDbTs(syncMode == utils.VarSyncModeAll)
LOG.Info("for only mongo_s_url address exists startTsMap[%v]", startTsMaptmp)
return syncMode, startTsMaptmp, int64(0), nil
}
ok, token, err := coordinator.isCheckpointExist()
if err != nil {
return "", nil, int64(0), fmt.Errorf("check isCheckpointExist failed: %v", err)
}
if !ok {
return utils.VarSyncModeAll, nil, token, nil
}
startTsMap := map[string]int64{
coordinator.RealSourceIncrSync[0].ReplicaName: token.(int64),
}
return utils.VarSyncModeIncr, startTsMap, token, nil
}
smallestNewTs, startTsMap, canIncrSync, err := coordinator.compareCheckpointAndDbTs(syncMode == utils.VarSyncModeAll)
if err != nil {
return "", nil, int64(0), err
}
if canIncrSync {
LOG.Info("sync mode run %v", utils.VarSyncModeIncr)
return utils.VarSyncModeIncr, startTsMap, int64(0), nil
} else if syncMode == utils.VarSyncModeIncr || conf.Options.Tunnel != utils.VarTunnelDirect {
// bugfix v2.4.11: if can not run incr sync directly, return error when sync_mode == "incr"
// bugfix v2.4.12: return error when tunnel != "direct"
return "", nil, int64(0), fmt.Errorf("start time illegal, can't run incr sync")
} else {
return utils.VarSyncModeAll, nil, smallestNewTs, nil
}
}