func checkConflict()

in cmd/collector/sanitize.go [338:482]


func checkConflict() error {
	/*****************************1. global settings******************************/
	// http_profile & system_profile
	conf.Options.FullSyncHTTPListenPort = utils.MayBeRandom(conf.Options.FullSyncHTTPListenPort)
	conf.Options.IncrSyncHTTPListenPort = utils.MayBeRandom(conf.Options.IncrSyncHTTPListenPort)
	if conf.Options.FullSyncHTTPListenPort == conf.Options.IncrSyncHTTPListenPort {
		return fmt.Errorf("full_sync.http_port should not equal to incr_sync.http_port")
	}

	conf.Options.SystemProfilePort = utils.MayBeRandom(conf.Options.SystemProfilePort)
	// check mongo_cs_url
	if conf.Options.MongoCsUrl == "" && len(conf.Options.MongoUrls) > 1 {
		return fmt.Errorf("mongo_cs_url be config server address when source MongoDB is sharding")
	}
	// set checkpoint.storage.url if empty
	if conf.Options.CheckpointStorageUrl == "" {
		if len(conf.Options.MongoUrls) == 1 {
			// replica-set
			conf.Options.CheckpointStorageUrl = conf.Options.MongoUrls[0]
		} else if len(conf.Options.MongoSUrl) > 0 {
			conf.Options.CheckpointStorageUrl = conf.Options.MongoSUrl
		} else {
			return fmt.Errorf("checkpoint.storage.url should be given when source is sharding")
		}
	}
	// avoid the typo of mongo urls
	if utils.HasDuplicated(conf.Options.MongoUrls) {
		return fmt.Errorf("mongo urls were duplicated")
	}
	// quorm
	if conf.Options.MasterQuorum && conf.Options.CheckpointStorage != utils.VarCheckpointStorageDatabase {
		return fmt.Errorf("context storage should set to 'database' while master election enabled")
	}
	// filter
	if len(conf.Options.FilterNamespaceBlack) != 0 && len(conf.Options.FilterNamespaceWhite) != 0 {
		return fmt.Errorf("at most one of {filter.namespace.black, filter.namespace.white} can be given")
	}
	// filter - filter.pass.special.db
	if len(conf.Options.FilterPassSpecialDb) != 0 {
		// init ns
		filter.InitNs(conf.Options.FilterPassSpecialDb)
	}
	// special variable
	if conf.Options.SpecialSourceDBFlag != "" &&
		conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
		return fmt.Errorf("special.source.db.flag should be empty or 'aliyun_serverless'")
	}
	if conf.Options.SpecialSourceDBFlag == utils.VarSpecialSourceDBFlagAliyunServerless {
		if conf.Options.IncrSyncMongoFetchMethod != utils.VarIncrSyncMongoFetchMethodChangeStream {
			return fmt.Errorf("incr_sync.mongo_fetch_method must be 'change_stream' when special.source.db.flag is set")
		}
	}

	/*****************************2. full sync******************************/

	/*****************************3. incr sync******************************/
	// set incr_sync.worker = shards number if source is sharding
	if len(conf.Options.MongoUrls) > 1 {
		if conf.Options.IncrSyncWorker != len(conf.Options.MongoUrls) &&
			conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
			// only change when incr_sync.mongo_fetch_method = oplog
			conf.Options.IncrSyncWorker = len(conf.Options.MongoUrls)
		}
		if conf.Options.FilterDDLEnable == true &&
			conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
			return fmt.Errorf("DDL is not support for sharding when incr_sync.mongo_fetch_method == 'oplog'")
		}
	}
	if conf.Options.Tunnel == utils.VarTunnelDirect &&
		conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone {
		conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
	}
	if len(conf.Options.TunnelAddress) == 0 &&
		conf.Options.Tunnel != utils.VarTunnelMock {
		return fmt.Errorf("incr_sync.tunnel.address shouldn't be empty when incr_sync.tunnel != 'mock'")
	}
	if conf.Options.TunnelKafkaPartitionNumber <= 0 {
		conf.Options.TunnelKafkaPartitionNumber = 1
	} else if conf.Options.TunnelKafkaPartitionNumber > conf.Options.IncrSyncWorker {
		return fmt.Errorf("tunnel.kafka.partition[%v] number should <= incr_sync.worker number[%v]",
			conf.Options.TunnelKafkaPartitionNumber, conf.Options.IncrSyncWorker)
	}
	conf.Options.IncrSyncCollisionEnable = conf.Options.IncrSyncExecutor != 1
	if conf.Options.Tunnel != utils.VarTunnelDirect &&
		conf.Options.SyncMode != utils.VarSyncModeIncr {
		return fmt.Errorf("full sync only support when tunnel type == direct")
	}
	// check source mongodb version >= 4.0 when change stream enable
	if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
		if conf.Options.MongoSUrl == "" && len(conf.Options.MongoUrls) > 1 {
			return fmt.Errorf("mongo_s_url should be given when source is sharding and fetch method is change stream")
		}

		source, err := getSourceDbUrl()
		if err != nil {
			return err
		}

		conn, err := utils.NewMongoCommunityConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
			utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
		if err != nil {
			return fmt.Errorf("connect source[%v] failed[%v]", utils.BlockMongoUrlPassword(source, "***"), err)
		}
		if isOk, err := utils.GetAndCompareVersion(conn, "4.0.1", conf.Options.SourceDBVersion); err != nil {
			return fmt.Errorf("compare source[%v] to v4.0.1 failed[%v]", source, err)
		} else if !isOk {
			return fmt.Errorf("source[%v] version should >= 4.0.1 when incr_sync.mongo_fetch_method == %v",
				conf.Options.MongoUrls[0], utils.VarIncrSyncMongoFetchMethodChangeStream)
		}
	} else {
		// disable mongos if fetch method != 'change_stream'
		// conf.Options.MongoSUrl = ""
	}
	// set compressor to none when tunnel message is not 'raw'
	if conf.Options.TunnelMessage != utils.VarTunnelMessageRaw {
		if conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone {
			conf.Options.IncrSyncWorkerOplogCompressor = utils.VarIncrSyncWorkerOplogCompressorNone
		}
	}
	// disable oplog disk persist when sync mode isn't 'full'
	if conf.Options.FullSyncReaderOplogStoreDisk {
		if conf.Options.SyncMode != utils.VarSyncModeAll {
			conf.Options.FullSyncReaderOplogStoreDisk = false
		}
	}
	// only enable 'incr_sync.change_stream.watch_full_document' when tunnel != direct.
	if conf.Options.IncrSyncChangeStreamWatchFullDocument {
		if conf.Options.Tunnel == utils.VarTunnelDirect {
			conf.Options.IncrSyncChangeStreamWatchFullDocument = false
		}
	}
	// set start position to 0 when sync_mode != "incr"
	if conf.Options.SyncMode != utils.VarSyncModeIncr {
		conf.Options.CheckpointStartPosition = 1
	}

	/*****************************4. inner variables******************************/
	if conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugNone &&
		conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugDiscard &&
		conf.Options.IncrSyncReaderDebug != utils.VarIncrSyncReaderDebugPrint {
		return fmt.Errorf("incr_sync.reader.debug[%v] invalid", conf.Options.IncrSyncReaderDebug)
	}

	return nil
}