in collector/coordinator/replication.go [109:191]
func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {
var conn *utils.MongoCommunityConn
var err error
var hasUniqIndex = false
rs := map[string]int{}
// try to connect CheckpointStorage
checkpointStorageUrl := conf.Options.CheckpointStorageUrl
if conn, err = utils.NewMongoCommunityConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault,
conf.Options.CheckpointStorageUrlMongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect checkpointStorageUrl[%v] error[%v]. Please add primary node into 'mongo_urls' "+
"if 'context.storage.url' is empty", checkpointStorageUrl, err)
return err
}
conn.Close()
for i, src := range coordinator.MongoD {
if conn, err = utils.NewMongoCommunityConn(src.URL, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault,
conf.Options.MongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect mongo server error. %v, url : %s. "+
"See https://github.com/alibaba/MongoShake/wiki/FAQ"+
"#q-how-to-solve-the-oplog-tailer-initialize-failed-no-reachable-servers-error", err, src.URL)
return err
}
// a conventional ReplicaSet should have local.oplog.rs collection
if conf.Options.SyncMode != utils.VarSyncModeFull &&
// conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog &&
!conn.HasOplogNs(utils.GetListCollectionQueryCondition(conn)) {
LOG.Critical("There has no oplog collection in mongo db server")
conn.Close()
return errors.New("no oplog ns in mongo. " +
"See https://github.com/alibaba/MongoShake/wiki/FAQ" +
"#q-how-to-solve-the-oplog-tailer-initialize-failed-no-oplog-ns-in-mongo-error")
}
// check if there has dup server every replica set in RS or Shard
rsName := conn.AcquireReplicaSetName()
// rsName will be set to default if empty
if rsName == "" {
rsName = fmt.Sprintf("default-%d", i)
LOG.Warn("Source mongodb have empty replica set name, url[%s], change to default[%s]",
utils.BlockMongoUrlPassword(src.URL, "***"), rsName)
}
if _, exist := rs[rsName]; exist {
LOG.Critical("There has duplicate replica set name : %s", rsName)
conn.Close()
return errors.New("duplicated replica set source")
}
rs[rsName] = 1
src.ReplicaName = rsName
// look around if there has unique index
if !hasUniqIndex && conf.Options.IncrSyncShardKey == oplog.ShardAutomatic {
hasUniqIndex = conn.HasUniqueIndex(utils.GetListCollectionQueryCondition(conn))
}
// doesn't reuse current connection
conn.Close()
}
// we choose sharding by collection if there are unique index
// existing in collections
if conf.Options.IncrSyncShardKey == oplog.ShardAutomatic {
if hasUniqIndex {
conf.Options.IncrSyncShardKey = oplog.ShardByNamespace
} else {
conf.Options.IncrSyncShardKey = oplog.ShardByID
}
}
// TODO, check unique exist on given namespace
if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 {
}
return nil
}