in cmd/collector/sanitize.go [73:278]
func checkDefaultValue() error {
// 1. global
if conf.Options.Id == "" {
conf.Options.Id = "mongoshake"
}
if conf.Options.FullSyncHTTPListenPort <= 0 {
conf.Options.FullSyncHTTPListenPort = 9101
}
if conf.Options.IncrSyncHTTPListenPort <= 0 {
conf.Options.IncrSyncHTTPListenPort = 9100
}
if conf.Options.SystemProfilePort <= 0 {
conf.Options.SystemProfilePort = 9200
}
if conf.Options.LogLevel == "" {
conf.Options.LogLevel = utils.VarLogLevelInfo
} else if conf.Options.LogLevel != utils.VarLogLevelDebug && conf.Options.LogLevel != utils.VarLogLevelInfo &&
conf.Options.LogLevel != utils.VarLogLevelWarning && conf.Options.LogLevel != utils.VarLogLevelError {
return fmt.Errorf("log.level should in {debug, info, warning, error}")
}
if conf.Options.LogFileName == "" {
conf.Options.LogFileName = "mongoshake.log"
}
if conf.Options.SyncMode == "" {
conf.Options.SyncMode = utils.VarSyncModeIncr
} else if conf.Options.SyncMode != utils.VarSyncModeAll &&
conf.Options.SyncMode != utils.VarSyncModeFull &&
conf.Options.SyncMode != utils.VarSyncModeIncr {
return fmt.Errorf("sync_mode should in {all, full, incr}")
}
if len(conf.Options.MongoSUrl) == 0 {
if len(conf.Options.MongoUrls) == 0 {
return fmt.Errorf("mongo_s_url and mongo_urls cannot be empty at the same time")
}
}
if conf.Options.MongoConnectMode == "" {
conf.Options.MongoConnectMode = utils.VarMongoConnectModeSecondaryPreferred
} else {
if conf.Options.MongoConnectMode != utils.VarMongoConnectModePrimary &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeSecondaryPreferred &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeSecondary &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeNearset &&
conf.Options.MongoConnectMode != utils.VarMongoConnectModeStandalone {
return fmt.Errorf("mongo_connect_mode should in {primary, secondaryPreferred, secondary, nearest, standalone}")
}
}
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
if len(conf.Options.MongoSUrl) == 0 && len(conf.Options.MongoUrls) > 1 {
return fmt.Errorf("mongo_s_url should be given if source is sharding and incr_sync.mongo_fetch_method == %s",
utils.VarIncrSyncMongoFetchMethodChangeStream)
}
}
if conf.Options.CheckpointStorage == "" {
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
} else if conf.Options.CheckpointStorage != utils.VarCheckpointStorageDatabase &&
conf.Options.CheckpointStorage != utils.VarCheckpointStorageApi {
return fmt.Errorf("checkpoint.storage should in {database, api}")
}
if conf.Options.CheckpointStorageUrl == "" {
// do nothing here
}
if conf.Options.CheckpointStorageDb == "" {
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault
}
if conf.Options.CheckpointStorageCollection == "" {
conf.Options.CheckpointStorageCollection = utils.VarCheckpointStorageCollectionDefault
}
if conf.Options.CheckpointStartPosition <= 0 {
conf.Options.CheckpointStartPosition = 1
}
if conf.Options.CheckpointInterval <= 0 {
conf.Options.CheckpointInterval = 5000 // ms
}
// 2. full sync
if conf.Options.FullSyncReaderCollectionParallel <= 0 {
conf.Options.FullSyncReaderCollectionParallel = 6
}
if conf.Options.FullSyncReaderWriteDocumentParallel <= 0 {
conf.Options.FullSyncReaderWriteDocumentParallel = 8
}
if conf.Options.FullSyncReaderParallelThread <= 0 {
conf.Options.FullSyncReaderParallelThread = 1
} else if conf.Options.FullSyncReaderParallelThread > 128 {
return fmt.Errorf("full_sync.reader.parallel_thread should <= 128")
}
if conf.Options.FullSyncReaderParallelIndex == "" {
conf.Options.FullSyncReaderParallelIndex = "_id"
}
if conf.Options.FullSyncReaderDocumentBatchSize <= 0 {
conf.Options.FullSyncReaderDocumentBatchSize = 128
}
if conf.Options.FullSyncReaderFetchBatchSize <= 0 {
conf.Options.FullSyncReaderFetchBatchSize = 1024
}
if conf.Options.FullSyncCreateIndex == "" {
conf.Options.FullSyncCreateIndex = utils.VarFullSyncCreateIndexForeground
} else if conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexNone &&
conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexForeground &&
conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexBackground {
return fmt.Errorf("full_sync.create_index should in {none, foreground, background}")
}
if conf.Options.FullSyncReaderOplogStoreDiskMaxSize <= 0 {
conf.Options.FullSyncReaderOplogStoreDiskMaxSize = 256000
}
// 3. incr sync
if conf.Options.IncrSyncMongoFetchMethod == "" {
conf.Options.IncrSyncMongoFetchMethod = utils.VarIncrSyncMongoFetchMethodOplog
} else if conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodOplog &&
conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodChangeStream {
return fmt.Errorf("incr_sync.mongo_fetch_method should in {oplog, change_stream}")
}
if conf.Options.IncrSyncShardKey == "" {
conf.Options.IncrSyncShardKey = utils.VarIncrSyncShardKeyCollection
} else if conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyAuto &&
conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyId &&
conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyCollection {
return fmt.Errorf("incr_sync.shard_key should in {auto, id, collection}")
}
if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 {
if conf.Options.IncrSyncShardKey != utils.VarIncrSyncShardKeyCollection {
return fmt.Errorf("incr_sync.shard_by_object_id_whitelist should only be set when 'incr_sync.shard_key == collection'")
}
}
if conf.Options.IncrSyncWorker == 0 {
conf.Options.IncrSyncWorker = 8
} else if conf.Options.IncrSyncWorker <= 0 || conf.Options.IncrSyncWorker > 256 {
return fmt.Errorf("incr_sync.worker[%v] should in range [1, 256]", conf.Options.IncrSyncWorker)
}
if conf.Options.IncrSyncTunnelWriteThread == 0 {
conf.Options.IncrSyncTunnelWriteThread = conf.Options.IncrSyncWorker
} else if conf.Options.IncrSyncTunnelWriteThread%conf.Options.IncrSyncWorker != 0 {
return fmt.Errorf("incr_sync.tunnel.write_thread[%v] must be an interge multiple of incr_sync.worker[%v]",
conf.Options.IncrSyncTunnelWriteThread, conf.Options.IncrSyncWorker)
}
if conf.Options.IncrSyncWorkerOplogCompressor == "" {
conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
} else if conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorGzip &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorZlib &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorDeflate &&
conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorSnappy {
return fmt.Errorf("incr_sync.worker.oplog_compressor in {none, gzip, zlib, deflate, snappy}")
}
if conf.Options.IncrSyncTargetDelay < 0 {
conf.Options.IncrSyncTargetDelay = 0
}
if conf.Options.IncrSyncWorkerBatchQueueSize <= 0 {
conf.Options.IncrSyncWorkerBatchQueueSize = 64
}
if conf.Options.IncrSyncAdaptiveBatchingMaxSize <= 0 {
conf.Options.IncrSyncAdaptiveBatchingMaxSize = 1024
}
if conf.Options.IncrSyncFetcherBufferCapacity <= 0 {
conf.Options.IncrSyncFetcherBufferCapacity = 256
}
if conf.Options.IncrSyncReaderFetchBatchSize <= 0 {
conf.Options.IncrSyncReaderFetchBatchSize = 1024
}
if conf.Options.Tunnel == "" {
conf.Options.Tunnel = utils.VarTunnelDirect
} else if conf.Options.Tunnel != utils.VarTunnelDirect &&
conf.Options.Tunnel != utils.VarTunnelRpc &&
conf.Options.Tunnel != utils.VarTunnelTcp &&
conf.Options.Tunnel != utils.VarTunnelFile &&
conf.Options.Tunnel != utils.VarTunnelKafka &&
conf.Options.Tunnel != utils.VarTunnelMock {
return fmt.Errorf("incr_sync.tunnel in {direct, rpc, tcp, file, kafka, mock}")
}
if conf.Options.TunnelMessage == "" {
conf.Options.TunnelMessage = utils.VarTunnelMessageRaw
} else if conf.Options.TunnelMessage != utils.VarTunnelMessageRaw &&
conf.Options.TunnelMessage != utils.VarTunnelMessageBson &&
conf.Options.TunnelMessage != utils.VarTunnelMessageJson {
return fmt.Errorf("incr_sync.tunnel.message in {raw, bson, json}")
}
if conf.Options.IncrSyncExecutor <= 0 {
conf.Options.IncrSyncExecutor = 1
}
if conf.Options.IncrSyncConflictWriteTo == "" {
conf.Options.IncrSyncConflictWriteTo = utils.VarIncrSyncConflictWriteToNone
} else if conf.Options.IncrSyncConflictWriteTo != utils.VarIncrSyncConflictWriteToNone &&
conf.Options.IncrSyncConflictWriteTo != utils.VarIncrSyncConflictWriteToDb &&
conf.Options.IncrSyncConflictWriteTo != utils.VarIncrSyncConflictWriteToSdk {
return fmt.Errorf("incr_sync.conflict_write_to in {none, db, sdk}")
}
if conf.Options.IncrSyncReaderBufferTime <= 0 {
conf.Options.IncrSyncReaderBufferTime = 1
}
/********************************/
// set utils
utils.AppDatabase = conf.Options.CheckpointStorageDb
utils.APPConflictDatabase = fmt.Sprintf("%s_%s", utils.AppDatabase, "_conflict")
filter.NsShouldBeIgnore[utils.AppDatabase+"."] = true
filter.NsShouldBeIgnore[utils.APPConflictDatabase+"."] = true
return nil
}