in cmd/collector/sanitize.go [338:482]
func checkConflict() error {
/*****************************1. global settings******************************/
// http_profile & system_profile
conf.Options.FullSyncHTTPListenPort = utils.MayBeRandom(conf.Options.FullSyncHTTPListenPort)
conf.Options.IncrSyncHTTPListenPort = utils.MayBeRandom(conf.Options.IncrSyncHTTPListenPort)
if conf.Options.FullSyncHTTPListenPort == conf.Options.IncrSyncHTTPListenPort {
return fmt.Errorf("full_sync.http_port should not equal to incr_sync.http_port")
}
conf.Options.SystemProfilePort = utils.MayBeRandom(conf.Options.SystemProfilePort)
// check mongo_cs_url
if conf.Options.MongoCsUrl == "" && len(conf.Options.MongoUrls) > 1 {
return fmt.Errorf("mongo_cs_url be config server address when source MongoDB is sharding")
}
// set checkpoint.storage.url if empty
if conf.Options.CheckpointStorageUrl == "" {
if len(conf.Options.MongoUrls) == 1 {
// replica-set
conf.Options.CheckpointStorageUrl = conf.Options.MongoUrls[0]
} else if len(conf.Options.MongoSUrl) > 0 {
conf.Options.CheckpointStorageUrl = conf.Options.MongoSUrl
} else {
return fmt.Errorf("checkpoint.storage.url should be given when source is sharding")
}
}
// avoid the typo of mongo urls
if utils.HasDuplicated(conf.Options.MongoUrls) {
return fmt.Errorf("mongo urls were duplicated")
}
// quorm
if conf.Options.MasterQuorum && conf.Options.CheckpointStorage != utils.VarCheckpointStorageDatabase {
return fmt.Errorf("context storage should set to 'database' while master election enabled")
}
// filter
if len(conf.Options.FilterNamespaceBlack) != 0 && len(conf.Options.FilterNamespaceWhite) != 0 {
return fmt.Errorf("at most one of {filter.namespace.black, filter.namespace.white} can be given")
}
// filter - filter.pass.special.db
if len(conf.Options.FilterPassSpecialDb) != 0 {
// init ns
filter.InitNs(conf.Options.FilterPassSpecialDb)
}
// special variable
if conf.Options.SpecialSourceDBFlag != "" &&
conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
return fmt.Errorf("special.source.db.flag should be empty or 'aliyun_serverless'")
}
if conf.Options.SpecialSourceDBFlag == utils.VarSpecialSourceDBFlagAliyunServerless {
if conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodChangeStream {
return fmt.Errorf("incr_sync.mongo_fetch_method must be 'change_stream' when special.source.db.flag is set")
}
}
/*****************************2. full sync******************************/
/*****************************3. incr sync******************************/
// set incr_sync.worker = shards number if source is sharding
if len(conf.Options.MongoUrls) > 1 {
if conf.Options.IncrSyncWorker != len(conf.Options.MongoUrls) &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
// only change when incr_sync.mongo_fetch_method = oplog
conf.Options.IncrSyncWorker = len(conf.Options.MongoUrls)
}
if conf.Options.FilterDDLEnable == true &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
return fmt.Errorf("DDL is not support for sharding when incr_sync.mongo_fetch_method == 'oplog'")
}
}
if conf.Options.Tunnel == utils.VarTunnelDirect &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone {
conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
}
if len(conf.Options.TunnelAddress) == 0 &&
conf.Options.Tunnel != utils.VarTunnelMock {
return fmt.Errorf("incr_sync.tunnel.address shouldn't be empty when incr_sync.tunnel != 'mock'")
}
if conf.Options.TunnelKafkaPartitionNumber <= 0 {
conf.Options.TunnelKafkaPartitionNumber = 1
} else if conf.Options.TunnelKafkaPartitionNumber > conf.Options.IncrSyncWorker {
return fmt.Errorf("tunnel.kafka.partition[%v] number should <= incr_sync.worker number[%v]",
conf.Options.TunnelKafkaPartitionNumber, conf.Options.IncrSyncWorker)
}
conf.Options.IncrSyncCollisionEnable = conf.Options.IncrSyncExecutor != 1
if conf.Options.Tunnel != utils.VarTunnelDirect &&
conf.Options.SyncMode != utils.VarSyncModeIncr {
return fmt.Errorf("full sync only support when tunnel type == direct")
}
// check source mongodb version >= 4.0 when change stream enable
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
if conf.Options.MongoSUrl == "" && len(conf.Options.MongoUrls) > 1 {
return fmt.Errorf("mongo_s_url should be given when source is sharding and fetch method is change stream")
}
source, err := getSourceDbUrl()
if err != nil {
return err
}
conn, err := utils.NewMongoCommunityConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect source[%v] failed[%v]", utils.BlockMongoUrlPassword(source, "***"), err)
}
if isOk, err := utils.GetAndCompareVersion(conn, "4.0.1", conf.Options.SourceDBVersion); err != nil {
return fmt.Errorf("compare source[%v] to v4.0.1 failed[%v]", source, err)
} else if !isOk {
return fmt.Errorf("source[%v] version should >= 4.0.1 when incr_sync.mongo_fetch_method == %v",
conf.Options.MongoUrls[0], utils.VarIncrSyncMongoFetchMethodChangeStream)
}
} else {
// disable mongos if fetch method != 'change_stream'
// conf.Options.MongoSUrl = ""
}
// set compressor to none when tunnel message is not 'raw'
if conf.Options.TunnelMessage != utils.VarTunnelMessageRaw {
if conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone {
conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
}
}
// disable oplog disk persist when sync mode isn't 'full'
if conf.Options.FullSyncReaderOplogStoreDisk {
if conf.Options.SyncMode != utils.VarSyncModeAll {
conf.Options.FullSyncReaderOplogStoreDisk = false
}
}
// only enable 'incr_sync.change_stream.watch_full_document' when tunnel != direct.
if conf.Options.IncrSyncChangeStreamWatchFullDocument {
if conf.Options.Tunnel == utils.VarTunnelDirect {
conf.Options.IncrSyncChangeStreamWatchFullDocument = false
}
}
// set start position to 0 when sync_mode != "incr"
if conf.Options.SyncMode != utils.VarSyncModeIncr {
conf.Options.CheckpointStartPosition = 1
}
/*****************************4. inner variables******************************/
if conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugNone &&
conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugDiscard &&
conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugPrint {
return fmt.Errorf("incr_sync.reader.debug[%v] invalid", conf.Options.IncrSyncReaderDebug)
}
return nil
}