cmd/collector/sanitize.go (399 lines of code) (raw):
package main
import (
"fmt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/collector/filter"
utils "github.com/alibaba/MongoShake/v2/common"
)
// priority use mongo_s_url
func getSourceDbUrl() (string, error) {
var source string
if len(conf.Options.MongoSUrl) != 0 {
source = conf.Options.MongoSUrl
} else {
if len(conf.Options.MongoUrls) > 0 {
source = conf.Options.MongoUrls[0]
} else {
return source, fmt.Errorf("mongo_urls && mongo_s_url should not all be empty")
}
}
return source, nil
}
func SanitizeOptions() error {
// compatible with old version
if err := handleDeprecateConf(); err != nil {
return err
}
// default value
if err := checkDefaultValue(); err != nil {
return err
}
// check connection
if err := checkConnection(); err != nil {
return err
}
// judge conflict value
return checkConflict()
}
func handleDeprecateConf() error {
// IncrSyncTunnel has deprecated since v2.4.1
if conf.Options.Tunnel == "" && conf.Options.IncrSyncTunnel != "" {
conf.Options.Tunnel = conf.Options.IncrSyncTunnel
}
// IncrSyncTunnelAddress has deprecated since v2.4.1
if len(conf.Options.TunnelAddress) == 0 && len(conf.Options.IncrSyncTunnelAddress) != 0 {
conf.Options.TunnelAddress = conf.Options.IncrSyncTunnelAddress
}
// IncrSyncTunnelMessage has deprecated since v2.4.1
if conf.Options.TunnelMessage == "" && conf.Options.IncrSyncTunnelMessage != "" {
conf.Options.TunnelMessage = conf.Options.IncrSyncTunnelMessage
}
// HTTPListenPort has deprecated since v2.4.1
if conf.Options.HTTPListenPort != 0 && conf.Options.IncrSyncHTTPListenPort == 0 {
conf.Options.IncrSyncHTTPListenPort = conf.Options.HTTPListenPort
}
// SystemProfile has deprecated since v2.4.1
if conf.Options.SystemProfile != 0 && conf.Options.SystemProfilePort == 0 {
conf.Options.SystemProfilePort = conf.Options.SystemProfile
}
return nil
}
func checkDefaultValue() error {
// 1. global
if conf.Options.Id == "" {
conf.Options.Id = "mongoshake"
}
if conf.Options.FullSyncHTTPListenPort <= 0 {
conf.Options.FullSyncHTTPListenPort = 9101
}
if conf.Options.IncrSyncHTTPListenPort <= 0 {
conf.Options.IncrSyncHTTPListenPort = 9100
}
if conf.Options.SystemProfilePort <= 0 {
conf.Options.SystemProfilePort = 9200
}
if conf.Options.LogLevel == "" {
conf.Options.LogLevel = utils.VarLogLevelInfo
} else if conf.Options.LogLevel != utils.VarLogLevelDebug && conf.Options.LogLevel != utils.VarLogLevelInfo &&
conf.Options.LogLevel != utils.VarLogLevelWarning && conf.Options.LogLevel != utils.VarLogLevelError {
return fmt.Errorf("log.level should in {debug, info, warning, error}")
}
if conf.Options.LogFileName == "" {
conf.Options.LogFileName = "mongoshake.log"
}
if conf.Options.SyncMode == "" {
conf.Options.SyncMode = utils.VarSyncModeIncr
} else if conf.Options.SyncMode != utils.VarSyncModeAll &&
conf.Options.SyncMode != utils.VarSyncModeFull &&
conf.Options.SyncMode != utils.VarSyncModeIncr {
return fmt.Errorf("sync_mode should in {all, full, incr}")
}
if len(conf.Options.MongoSUrl) == 0 {
if len(conf.Options.MongoUrls) == 0 {
return fmt.Errorf("mongo_s_url and mongo_urls cannot be empty at the same time")
}
}
if conf.Options.MongoConnectMode == "" {
conf.Options.MongoConnectMode = utils.VarMongoConnectModeSecondaryPreferred
} else {
if conf.Options.MongoConnectMode != utils.VarMongoConnectModePrimary &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeSecondaryPreferred &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeSecondary &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeNearset &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeStandalone {
return fmt.Errorf("mongo_connect_mode should in {primary, secondaryPreferred, secondary, nearest, standalone}")
}
}
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
if len(conf.Options.MongoSUrl) == 0 && len(conf.Options.MongoUrls) > 1 {
return fmt.Errorf("mongo_s_url should be given if source is sharding and incr_sync.mongo_fetch_method == %s",
utils.VarIncrSyncMongoFetchMethodChangeStream)
}
}
if conf.Options.CheckpointStorage == "" {
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
} else if conf.Options.CheckpointStorage != utils.VarCheckpointStorageDatabase &&
conf.Options.CheckpointStorage != utils.VarCheckpointStorageApi {
return fmt.Errorf("checkpoint.storage should in {database, api}")
}
if conf.Options.CheckpointStorageUrl == "" {
// do nothing here
}
if conf.Options.CheckpointStorageDb == "" {
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault
}
if conf.Options.CheckpointStorageCollection == "" {
conf.Options.CheckpointStorageCollection = utils.VarCheckpointStorageCollectionDefault
}
if conf.Options.CheckpointStartPosition <= 0 {
conf.Options.CheckpointStartPosition = 1
}
if conf.Options.CheckpointInterval <= 0 {
conf.Options.CheckpointInterval = 5000 // ms
}
// 2. full sync
if conf.Options.FullSyncReaderCollectionParallel <= 0 {
conf.Options.FullSyncReaderCollectionParallel = 6
}
if conf.Options.FullSyncReaderWriteDocumentParallel <= 0 {
conf.Options.FullSyncReaderWriteDocumentParallel = 8
}
if conf.Options.FullSyncReaderParallelThread <= 0 {
conf.Options.FullSyncReaderParallelThread = 1
} else if conf.Options.FullSyncReaderParallelThread > 128 {
return fmt.Errorf("full_sync.reader.parallel_thread should <= 128")
}
if conf.Options.FullSyncReaderParallelIndex == "" {
conf.Options.FullSyncReaderParallelIndex = "_id"
}
if conf.Options.FullSyncReaderDocumentBatchSize <= 0 {
conf.Options.FullSyncReaderDocumentBatchSize = 128
}
if conf.Options.FullSyncReaderFetchBatchSize <= 0 {
conf.Options.FullSyncReaderFetchBatchSize = 1024
}
if conf.Options.FullSyncCreateIndex == "" {
conf.Options.FullSyncCreateIndex = utils.VarFullSyncCreateIndexForeground
} else if conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexNone &&
conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexForeground &&
conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexBackground {
return fmt.Errorf("full_sync.create_index should in {none, foreground, background}")
}
if conf.Options.FullSyncReaderOplogStoreDiskMaxSize <= 0 {
conf.Options.FullSyncReaderOplogStoreDiskMaxSize = 256000
}
// 3. incr sync
if conf.Options.IncrSyncMongoFetchMethod == "" {
conf.Options.IncrSyncMongoFetchMethod = utils.VarIncrSyncMongoFetchMethodOplog
} else if conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodOplog &&
conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodChangeStream {
return fmt.Errorf("incr_sync.mongo_fetch_method should in {oplog, change_stream}")
}
if conf.Options.IncrSyncShardKey == "" {
conf.Options.IncrSyncShardKey = utils.VarIncrSyncShardKeyCollection
} else if conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyAuto &&
conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyId &&
conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyCollection {
return fmt.Errorf("incr_sync.shard_key should in {auto, id, collection}")
}
if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 {
if conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyCollection {
return fmt.Errorf("incr_sync.shard_by_object_id_whitelist should only be set when 'incr_sync.shard_key == collection'")
}
}
if conf.Options.IncrSyncWorker == 0 {
conf.Options.IncrSyncWorker = 8
} else if conf.Options.IncrSyncWorker <= 0 || conf.Options.IncrSyncWorker > 256 {
return fmt.Errorf("incr_sync.worker[%v] should in range [1, 256]", conf.Options.IncrSyncWorker)
}
if conf.Options.IncrSyncTunnelWriteThread == 0 {
conf.Options.IncrSyncTunnelWriteThread = conf.Options.IncrSyncWorker
} else if conf.Options.IncrSyncTunnelWriteThread%conf.Options.IncrSyncWorker != 0 {
return fmt.Errorf("incr_sync.tunnel.write_thread[%v] must be an interge multiple of incr_sync.worker[%v]",
conf.Options.IncrSyncTunnelWriteThread, conf.Options.IncrSyncWorker)
}
if conf.Options.IncrSyncWorkerOplogCompressor == "" {
conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
} else if conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorGzip &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorZlib &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorDeflate &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorSnappy {
return fmt.Errorf("incr_sync.worker.oplog_compressor in {none, gzip, zlib, deflate, snappy}")
}
if conf.Options.IncrSyncTargetDelay < 0 {
conf.Options.IncrSyncTargetDelay = 0
}
if conf.Options.IncrSyncWorkerBatchQueueSize <= 0 {
conf.Options.IncrSyncWorkerBatchQueueSize = 64
}
if conf.Options.IncrSyncAdaptiveBatchingMaxSize <= 0 {
conf.Options.IncrSyncAdaptiveBatchingMaxSize = 1024
}
if conf.Options.IncrSyncFetcherBufferCapacity <= 0 {
conf.Options.IncrSyncFetcherBufferCapacity = 256
}
if conf.Options.IncrSyncReaderFetchBatchSize <= 0 {
conf.Options.IncrSyncReaderFetchBatchSize = 1024
}
if conf.Options.Tunnel == "" {
conf.Options.Tunnel = utils.VarTunnelDirect
} else if conf.Options.Tunnel != utils.VarTunnelDirect &&
conf.Options.Tunnel != utils.VarTunnelRpc &&
conf.Options.Tunnel != utils.VarTunnelTcp &&
conf.Options.Tunnel != utils.VarTunnelFile &&
conf.Options.Tunnel != utils.VarTunnelKafka &&
conf.Options.Tunnel != utils.VarTunnelMock {
return fmt.Errorf("incr_sync.tunnel in {direct, rpc, tcp, file, kafka, mock}")
}
if conf.Options.TunnelMessage == "" {
conf.Options.TunnelMessage = utils.VarTunnelMessageRaw
} else if conf.Options.TunnelMessage != utils.VarTunnelMessageRaw &&
conf.Options.TunnelMessage != utils.VarTunnelMessageBson &&
conf.Options.TunnelMessage != utils.VarTunnelMessageJson {
return fmt.Errorf("incr_sync.tunnel.message in {raw, bson, json}")
}
if conf.Options.IncrSyncExecutor <= 0 {
conf.Options.IncrSyncExecutor = 1
}
if conf.Options.IncrSyncConflictWriteTo == "" {
conf.Options.IncrSyncConflictWriteTo = utils.VarIncrSyncConflictWriteToNone
} else if conf.Options.IncrSyncConflictWriteTo != utils.VarIncrSyncConflictWriteToNone &&
conf.Options.IncrSyncConflictWriteTo != utils.VarIncrSyncConflictWriteToDb &&
conf.Options.IncrSyncConflictWriteTo != utils.VarIncrSyncConflictWriteToSdk {
return fmt.Errorf("incr_sync.conflict_write_to in {none, db, sdk}")
}
if conf.Options.IncrSyncReaderBufferTime <= 0 {
conf.Options.IncrSyncReaderBufferTime = 1
}
/********************************/
// set utils
utils.AppDatabase = conf.Options.CheckpointStorageDb
utils.APPConflictDatabase = fmt.Sprintf("%s_%s", utils.AppDatabase, "_conflict")
filter.NsShouldBeIgnore[utils.AppDatabase+"."] = true
filter.NsShouldBeIgnore[utils.APPConflictDatabase+"."] = true
return nil
}
func checkConnection() error {
// check mongo_urls
for _, mongo := range conf.Options.MongoUrls {
_, err := utils.NewMongoCommunityConn(mongo, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect source mongodb[%v] failed[%v]", utils.BlockMongoUrlPassword(mongo, "***"), err)
}
}
// check mongo_cs_url
if conf.Options.MongoCsUrl != "" {
_, err := utils.NewMongoCommunityConn(conf.Options.MongoCsUrl, utils.VarMongoConnectModeSecondaryPreferred,
true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect config-server[%v] failed[%v]", utils.BlockMongoUrlPassword(conf.Options.MongoCsUrl, "***"), err)
}
}
// check tunnel address
// no need to check target connection when debug flag set.
if conf.Options.Tunnel == utils.VarTunnelDirect &&
!conf.Options.FullSyncExecutorDebug &&
!conf.Options.IncrSyncExecutorDebug {
for i, mongo := range conf.Options.TunnelAddress {
targetConn, err := utils.NewMongoCommunityConn(mongo, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.TunnelMongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect target tunnel mongodb[%v] failed[%v]", utils.BlockMongoUrlPassword(mongo, "***"), err)
}
// set target version
if i == 0 {
conf.Options.TargetDBVersion, _ = utils.GetDBVersion(targetConn)
}
}
}
// set source version
source, err := getSourceDbUrl()
if err != nil {
return err
}
sourceConn, _ := utils.NewMongoCommunityConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
// ignore error
conf.Options.SourceDBVersion, _ = utils.GetDBVersion(sourceConn)
if ok, err := utils.GetAndCompareVersion(sourceConn, "2.6.0",
conf.Options.SourceDBVersion); err != nil {
return err
} else if !ok {
return fmt.Errorf("source MongoDB version[%v] should >= 3.0", conf.Options.SourceDBVersion)
}
return nil
}
func checkConflict() error {
/*****************************1. global settings******************************/
// http_profile & system_profile
conf.Options.FullSyncHTTPListenPort = utils.MayBeRandom(conf.Options.FullSyncHTTPListenPort)
conf.Options.IncrSyncHTTPListenPort = utils.MayBeRandom(conf.Options.IncrSyncHTTPListenPort)
if conf.Options.FullSyncHTTPListenPort == conf.Options.IncrSyncHTTPListenPort {
return fmt.Errorf("full_sync.http_port should not equal to incr_sync.http_port")
}
conf.Options.SystemProfilePort = utils.MayBeRandom(conf.Options.SystemProfilePort)
// check mongo_cs_url
if conf.Options.MongoCsUrl == "" && len(conf.Options.MongoUrls) > 1 {
return fmt.Errorf("mongo_cs_url be config server address when source MongoDB is sharding")
}
// set checkpoint.storage.url if empty
if conf.Options.CheckpointStorageUrl == "" {
if len(conf.Options.MongoUrls) == 1 {
// replica-set
conf.Options.CheckpointStorageUrl = conf.Options.MongoUrls[0]
} else if len(conf.Options.MongoSUrl) > 0 {
conf.Options.CheckpointStorageUrl = conf.Options.MongoSUrl
} else {
return fmt.Errorf("checkpoint.storage.url should be given when source is sharding")
}
}
// avoid the typo of mongo urls
if utils.HasDuplicated(conf.Options.MongoUrls) {
return fmt.Errorf("mongo urls were duplicated")
}
// quorm
if conf.Options.MasterQuorum && conf.Options.CheckpointStorage != utils.VarCheckpointStorageDatabase {
return fmt.Errorf("context storage should set to 'database' while master election enabled")
}
// filter
if len(conf.Options.FilterNamespaceBlack) != 0 && len(conf.Options.FilterNamespaceWhite) != 0 {
return fmt.Errorf("at most one of {filter.namespace.black, filter.namespace.white} can be given")
}
// filter - filter.pass.special.db
if len(conf.Options.FilterPassSpecialDb) != 0 {
// init ns
filter.InitNs(conf.Options.FilterPassSpecialDb)
}
// special variable
if conf.Options.SpecialSourceDBFlag != "" &&
conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
return fmt.Errorf("special.source.db.flag should be empty or 'aliyun_serverless'")
}
if conf.Options.SpecialSourceDBFlag == utils.VarSpecialSourceDBFlagAliyunServerless {
if conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodChangeStream {
return fmt.Errorf("incr_sync.mongo_fetch_method must be 'change_stream' when special.source.db.flag is set")
}
}
/*****************************2. full sync******************************/
/*****************************3. incr sync******************************/
// set incr_sync.worker = shards number if source is sharding
if len(conf.Options.MongoUrls) > 1 {
if conf.Options.IncrSyncWorker != len(conf.Options.MongoUrls) &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
// only change when incr_sync.mongo_fetch_method = oplog
conf.Options.IncrSyncWorker = len(conf.Options.MongoUrls)
}
if conf.Options.FilterDDLEnable == true &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
return fmt.Errorf("DDL is not support for sharding when incr_sync.mongo_fetch_method == 'oplog'")
}
}
if conf.Options.Tunnel == utils.VarTunnelDirect &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone {
conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
}
if len(conf.Options.TunnelAddress) == 0 &&
conf.Options.Tunnel != utils.VarTunnelMock {
return fmt.Errorf("incr_sync.tunnel.address shouldn't be empty when incr_sync.tunnel != 'mock'")
}
if conf.Options.TunnelKafkaPartitionNumber <= 0 {
conf.Options.TunnelKafkaPartitionNumber = 1
} else if conf.Options.TunnelKafkaPartitionNumber > conf.Options.IncrSyncWorker {
return fmt.Errorf("tunnel.kafka.partition[%v] number should <= incr_sync.worker number[%v]",
conf.Options.TunnelKafkaPartitionNumber, conf.Options.IncrSyncWorker)
}
conf.Options.IncrSyncCollisionEnable = conf.Options.IncrSyncExecutor != 1
if conf.Options.Tunnel != utils.VarTunnelDirect &&
conf.Options.SyncMode != utils.VarSyncModeIncr {
return fmt.Errorf("full sync only support when tunnel type == direct")
}
// check source mongodb version >= 4.0 when change stream enable
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
if conf.Options.MongoSUrl == "" && len(conf.Options.MongoUrls) > 1 {
return fmt.Errorf("mongo_s_url should be given when source is sharding and fetch method is change stream")
}
source, err := getSourceDbUrl()
if err != nil {
return err
}
conn, err := utils.NewMongoCommunityConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect source[%v] failed[%v]", utils.BlockMongoUrlPassword(source, "***"), err)
}
if isOk, err := utils.GetAndCompareVersion(conn, "4.0.1", conf.Options.SourceDBVersion); err != nil {
return fmt.Errorf("compare source[%v] to v4.0.1 failed[%v]", source, err)
} else if !isOk {
return fmt.Errorf("source[%v] version should >= 4.0.1 when incr_sync.mongo_fetch_method == %v",
conf.Options.MongoUrls[0], utils.VarIncrSyncMongoFetchMethodChangeStream)
}
} else {
// disable mongos if fetch method != 'change_stream'
// conf.Options.MongoSUrl = ""
}
// set compressor to none when tunnel message is not 'raw'
if conf.Options.TunnelMessage != utils.VarTunnelMessageRaw {
if conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone {
conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
}
}
// disable oplog disk persist when sync mode isn't 'full'
if conf.Options.FullSyncReaderOplogStoreDisk {
if conf.Options.SyncMode != utils.VarSyncModeAll {
conf.Options.FullSyncReaderOplogStoreDisk = false
}
}
// only enable 'incr_sync.change_stream.watch_full_document' when tunnel != direct.
if conf.Options.IncrSyncChangeStreamWatchFullDocument {
if conf.Options.Tunnel == utils.VarTunnelDirect {
conf.Options.IncrSyncChangeStreamWatchFullDocument = false
}
}
// set start position to 0 when sync_mode != "incr"
if conf.Options.SyncMode != utils.VarSyncModeIncr {
conf.Options.CheckpointStartPosition = 1
}
/*****************************4. inner variables******************************/
if conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugNone &&
conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugDiscard &&
conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugPrint {
return fmt.Errorf("incr_sync.reader.debug[%v] invalid", conf.Options.IncrSyncReaderDebug)
}
return nil
}