func()

in collector/coordinator/utils.go [144:192]


func (coordinator *ReplicationCoordinator) selectSyncMode(syncMode string) (string, map[string]int64,
	interface{}, error) {
	if syncMode != utils.VarSyncModeAll && syncMode != utils.VarSyncModeIncr {
		return syncMode, nil, int64(0), nil
	}

	// special case, I hate it.
	// TODO, checkpoint support ResumeToken
	if conf.Options.SpecialSourceDBFlag == utils.VarSpecialSourceDBFlagAliyunServerless ||
		(len(conf.Options.MongoSUrl) > 0 && len(conf.Options.MongoCsUrl) == 0 && len(conf.Options.MongoUrls) == 0) {
		// for only mongo_s_url address exists
		if syncMode == utils.VarSyncModeIncr {

			_, startTsMaptmp, _, _ := coordinator.compareCheckpointAndDbTs(syncMode == utils.VarSyncModeAll)
			LOG.Info("for only mongo_s_url address exists startTsMap[%v]", startTsMaptmp)

			return syncMode, startTsMaptmp, int64(0), nil
		}

		ok, token, err := coordinator.isCheckpointExist()
		if err != nil {
			return "", nil, int64(0), fmt.Errorf("check isCheckpointExist failed: %v", err)
		}
		if !ok {
			return utils.VarSyncModeAll, nil, token, nil
		}

		startTsMap := map[string]int64{
			coordinator.RealSourceIncrSync[0].ReplicaName: token.(int64),
		}
		return utils.VarSyncModeIncr, startTsMap, token, nil
	}

	smallestNewTs, startTsMap, canIncrSync, err := coordinator.compareCheckpointAndDbTs(syncMode == utils.VarSyncModeAll)
	if err != nil {
		return "", nil, int64(0), err
	}

	if canIncrSync {
		LOG.Info("sync mode run %v", utils.VarSyncModeIncr)
		return utils.VarSyncModeIncr, startTsMap, int64(0), nil
	} else if syncMode == utils.VarSyncModeIncr || conf.Options.Tunnel != utils.VarTunnelDirect {
		// bugfix v2.4.11: if can not run incr sync directly, return error when sync_mode == "incr"
		// bugfix v2.4.12: return error when tunnel != "direct"
		return "", nil, int64(0), fmt.Errorf("start time illegal, can't run incr sync")
	} else {
		return utils.VarSyncModeAll, nil, smallestNewTs, nil
	}
}