func()

in collector/coordinator/utils.go [23:115]


func (coordinator *ReplicationCoordinator) compareCheckpointAndDbTs(syncModeAll bool) (int64, map[string]int64, bool, error) {
	var (
		tsMap       map[string]utils.TimestampNode
		startTsMap  map[string]int64 // replica-set name => timestamp
		smallestNew int64
		err         error
	)

	switch testSelectSyncMode {
	case true:
		// only used for unit test
		tsMap, _, smallestNew, _, _, err = utils.GetAllTimestampInUT()
	case false:
		// smallestNew is the smallest of the all newest timestamp
		tsMap, _, smallestNew, _, _, err = utils.GetAllTimestamp(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
		if err != nil {
			return 0, nil, false, fmt.Errorf("get all timestamp failed: %v", err)
		}
	}

	startTsMap = make(map[string]int64, len(tsMap)+1)

	confTs32 := conf.Options.CheckpointStartPosition
	confTsMongoTs := confTs32 << 32

	LOG.Info("all node timestamp map: %v CheckpointStartPosition:%v", tsMap, utils.Int64ToTimestamp(confTsMongoTs))

	// fetch mongos checkpoint when using change stream
	var mongosCkpt *ckpt.CheckpointContext
	if coordinator.MongoS != nil &&
		conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
		LOG.Info("try to fetch mongos checkpoint")
		ckptManager := ckpt.NewCheckpointManager(coordinator.MongoS.ReplicaName, 0)
		ckptVar, exist, err := ckptManager.Get()
		if err != nil {
			return 0, nil, false, fmt.Errorf("get mongos[%v] checkpoint failed: %v",
				coordinator.MongoS.ReplicaName, err)
		} else if ckptVar == nil {
			return 0, nil, false, fmt.Errorf("get mongos[%v] checkpoint empty", coordinator.MongoS.ReplicaName)
		} else if !exist || ckptVar.Timestamp <= 1 { // empty
			mongosCkpt = nil
			// mongosCkpt = ckptVar // still use checkpoint
			startTsMap[coordinator.MongoS.ReplicaName] = int64(confTsMongoTs) // use configuration
		} else {
			mongosCkpt = ckptVar
			startTsMap[coordinator.MongoS.ReplicaName] = int64(ckptVar.Timestamp) // use old checkpoint
		}
	}

	for replName, ts := range tsMap {
		var ckptRemote *ckpt.CheckpointContext
		if mongosCkpt == nil {
			ckptManager := ckpt.NewCheckpointManager(replName, 0)
			ckptVar, exist, err := ckptManager.Get()
			if err != nil {
				return 0, nil, false, fmt.Errorf("get mongod[%v] checkpoint failed: %v", replName, err)
			} else if !exist || ckptVar.Timestamp <= 1 { // empty
				// set nil to make code more clear
				ckptRemote = nil
			} else {
				ckptRemote = ckptVar
			}

			LOG.Info("%s checkpoint using mongod/replica_set: %s, ckptRemote set? [%v]", replName,
				ckptVar, ckptRemote != nil)
		} else {
			ckptRemote = mongosCkpt
			LOG.Info("%s checkpoint using mongos: %s", replName, mongosCkpt)
		}

		if ckptRemote == nil {
			if syncModeAll || confTsMongoTs > (1<<32) && ts.Oldest >= confTsMongoTs {
				LOG.Info("%s syncModeAll[%v] ts.Oldest[%v], confTsMongoTs[%v]", replName, syncModeAll, ts.Oldest,
					confTsMongoTs)
				return smallestNew, nil, false, nil
			}
			startTsMap[replName] = int64(confTsMongoTs)
		} else {
			// checkpoint less than the oldest timestamp, ckpt.OplogDiskQueue == "" means not enable
			// disk persist
			if ts.Oldest >= ckptRemote.Timestamp && ckptRemote.OplogDiskQueue == "" {
				LOG.Info("%s ts.Oldest[%v] >= ckptRemote.Timestamp[%v], need full sync", replName,
					ts.Oldest, ckptRemote.Timestamp)

				// can't run incr sync directly
				return smallestNew, nil, false, nil
			}
			startTsMap[replName] = int64(ckptRemote.Timestamp)
		}
	}

	return smallestNew, startTsMap, true, nil
}