func()

in collector/coordinator/replication.go [109:191]


func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {
	var conn *utils.MongoCommunityConn
	var err error
	var hasUniqIndex = false
	rs := map[string]int{}

	// try to connect CheckpointStorage
	checkpointStorageUrl := conf.Options.CheckpointStorageUrl
	if conn, err = utils.NewMongoCommunityConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true,
		utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault,
		conf.Options.CheckpointStorageUrlMongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil {
		LOG.Critical("Connect checkpointStorageUrl[%v] error[%v]. Please add primary node into 'mongo_urls' "+
			"if 'context.storage.url' is empty", checkpointStorageUrl, err)
		return err
	}
	conn.Close()

	for i, src := range coordinator.MongoD {
		if conn, err = utils.NewMongoCommunityConn(src.URL, conf.Options.MongoConnectMode, true,
			utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault,
			conf.Options.MongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil {

			LOG.Critical("Connect mongo server error. %v, url : %s. "+
				"See https://github.com/alibaba/MongoShake/wiki/FAQ"+
				"#q-how-to-solve-the-oplog-tailer-initialize-failed-no-reachable-servers-error", err, src.URL)
			return err
		}

		// a conventional ReplicaSet should have local.oplog.rs collection
		if conf.Options.SyncMode != utils.VarSyncModeFull &&
			// conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog &&
			conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog &&
			!conn.HasOplogNs(utils.GetListCollectionQueryCondition(conn)) {

			LOG.Critical("There has no oplog collection in mongo db server")
			conn.Close()
			return errors.New("no oplog ns in mongo. " +
				"See https://github.com/alibaba/MongoShake/wiki/FAQ" +
				"#q-how-to-solve-the-oplog-tailer-initialize-failed-no-oplog-ns-in-mongo-error")
		}

		// check if there has dup server every replica set in RS or Shard
		rsName := conn.AcquireReplicaSetName()
		// rsName will be set to default if empty
		if rsName == "" {
			rsName = fmt.Sprintf("default-%d", i)
			LOG.Warn("Source mongodb have empty replica set name, url[%s], change to default[%s]",
				utils.BlockMongoUrlPassword(src.URL, "***"), rsName)
		}

		if _, exist := rs[rsName]; exist {
			LOG.Critical("There has duplicate replica set name : %s", rsName)
			conn.Close()
			return errors.New("duplicated replica set source")
		}
		rs[rsName] = 1
		src.ReplicaName = rsName

		// look around if there has unique index
		if !hasUniqIndex && conf.Options.IncrSyncShardKey == oplog.ShardAutomatic {
			hasUniqIndex = conn.HasUniqueIndex(utils.GetListCollectionQueryCondition(conn))
		}
		// doesn't reuse current connection
		conn.Close()
	}

	// we choose sharding by collection if there are unique index
	// existing in collections
	if conf.Options.IncrSyncShardKey == oplog.ShardAutomatic {
		if hasUniqIndex {
			conf.Options.IncrSyncShardKey = oplog.ShardByNamespace
		} else {
			conf.Options.IncrSyncShardKey = oplog.ShardByID
		}
	}

	// TODO, check unique exist on given namespace
	if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 {

	}

	return nil
}