func sanitizeOptions()

in nimo-shake/main/main.go [103:248]


func sanitizeOptions() error {
	if len(conf.Options.Id) == 0 {
		return fmt.Errorf("id[%v] shouldn't be empty", conf.Options.Id)
	}

	if conf.Options.SyncMode != utils.SyncModeAll && conf.Options.SyncMode != utils.SyncModeFull {
		return fmt.Errorf("sync_mode[%v] illegal, should in {all, full}", conf.Options.SyncMode)
	}

	if conf.Options.IncrSyncParallel != true {
		conf.Options.IncrSyncParallel = false
	} else {
		if conf.Options.SyncMode != utils.SyncModeAll {
			return fmt.Errorf("sync_mode must be all when incr_sync_parallel is true")
		}
	}
	if conf.Options.IncreasePersistDir == "" {
		conf.Options.IncreasePersistDir = "/tmp/"
	}

	if conf.Options.SourceAccessKeyID == "" {
		return fmt.Errorf("source.access_key_id shouldn't be empty")
	}

	if conf.Options.SourceSecretAccessKey == "" {
		return fmt.Errorf("source.secret_access_key shouldn't be empty")
	}

	if conf.Options.FilterCollectionBlack != "" && conf.Options.FilterCollectionWhite != "" {
		return fmt.Errorf("filter.collection.white and filter.collection.black can't both be given")
	}

	if conf.Options.QpsFull <= 0 || conf.Options.QpsIncr <= 0 {
		return fmt.Errorf("qps should > 0")
	}

	if conf.Options.QpsFullBatchNum <= 0 {
		conf.Options.QpsFullBatchNum = 128
	}
	if conf.Options.QpsIncrBatchNum <= 0 {
		conf.Options.QpsIncrBatchNum = 128
	}

	if conf.Options.TargetType != utils.TargetTypeMongo && conf.Options.TargetType != utils.TargetTypeAliyunDynamoProxy {
		return fmt.Errorf("conf.Options.TargetType[%v] supports {mongodb, aliyun_dynamo_proxy} currently", conf.Options.TargetType)
	}

	if len(conf.Options.TargetAddress) == 0 {
		return fmt.Errorf("target.address[%v] illegal", conf.Options.TargetAddress)
	}

	if conf.Options.FullConcurrency > 4096 || conf.Options.FullConcurrency == 0 {
		return fmt.Errorf("full.concurrency[%v] should in (0, 4096]", conf.Options.FullConcurrency)
	}

	if conf.Options.FullDocumentConcurrency > 4096 || conf.Options.FullDocumentConcurrency == 0 {
		return fmt.Errorf("full.document.concurrency[%v] should in (0, 4096]", conf.Options.FullDocumentConcurrency)
	}

	if conf.Options.FullDocumentWriteBatch <= 0 {
		if conf.Options.TargetType == utils.TargetTypeAliyunDynamoProxy {
			conf.Options.FullDocumentWriteBatch = 25
		} else {
			conf.Options.FullDocumentWriteBatch = 128
		}
	} else if conf.Options.FullDocumentWriteBatch > 25 && conf.Options.TargetType == utils.TargetTypeAliyunDynamoProxy {
		conf.Options.FullDocumentWriteBatch = 25
	}

	if conf.Options.FullReadConcurrency <= 0 {
		conf.Options.FullReadConcurrency = 1
	} else if conf.Options.FullReadConcurrency > 8192 {
		return fmt.Errorf("full.read.concurrency[%v] should in (0, 8192]", conf.Options.FullReadConcurrency)
	}

	if conf.Options.FullDocumentParser > 4096 || conf.Options.FullDocumentParser == 0 {
		return fmt.Errorf("full.document.parser[%v] should in (0, 4096]", conf.Options.FullDocumentParser)
	}

	// always enable
	conf.Options.FullEnableIndexPrimary = true

	if conf.Options.ConvertType == "" {
		conf.Options.ConvertType = utils.ConvertMTypeChange
	}
	if conf.Options.ConvertType != utils.ConvertTypeRaw &&
		conf.Options.ConvertType != utils.ConvertTypeChange &&
		conf.Options.ConvertType != utils.ConvertMTypeChange {
		return fmt.Errorf("convert.type[%v] illegal", conf.Options.ConvertType)
	}

	if conf.Options.IncreaseConcurrency == 0 {
		return fmt.Errorf("increase.concurrency should > 0")
	}

	if conf.Options.TargetMongoDBType != "" && conf.Options.TargetMongoDBType != utils.TargetMongoDBTypeReplica &&
		conf.Options.TargetMongoDBType != utils.TargetMongoDBTypeSharding {
		return fmt.Errorf("illegal target.mongodb.type[%v]", conf.Options.TargetMongoDBType)
	}

	if conf.Options.TargetType == utils.TargetTypeMongo && conf.Options.TargetDBExist != "" &&
		conf.Options.TargetDBExist != utils.TargetDBExistRename &&
		conf.Options.TargetDBExist != utils.TargetDBExistDrop ||
		conf.Options.TargetType == utils.TargetTypeAliyunDynamoProxy && conf.Options.TargetDBExist != "" &&
			conf.Options.TargetDBExist != utils.TargetDBExistDrop {
		return fmt.Errorf("target.mongodb.exist[%v] should be 'drop' when target.type=%v",
			conf.Options.TargetDBExist, conf.Options.TargetType)
	}
	// set ConvertType
	if conf.Options.TargetType == utils.TargetTypeAliyunDynamoProxy {
		conf.Options.ConvertType = utils.ConvertTypeSame
	}

	// checkpoint
	if conf.Options.CheckpointType == "" {
		conf.Options.CheckpointType = checkpoint.CheckpointWriterTypeFile
	}
	if conf.Options.CheckpointType == checkpoint.CheckpointWriterTypeMongo &&
		conf.Options.CheckpointAddress == "" &&
		conf.Options.TargetType != utils.TargetTypeMongo {
		return fmt.Errorf("checkpoint.type should == file when checkpoint.address is empty and target.type != mongodb")
	}

	if conf.Options.CheckpointAddress == "" {
		if conf.Options.TargetType == utils.TargetTypeMongo {
			conf.Options.CheckpointAddress = conf.Options.TargetAddress
		} else {
			conf.Options.CheckpointAddress = "checkpoint"
		}
	}
	if conf.Options.CheckpointDb == "" {
		conf.Options.CheckpointDb = fmt.Sprintf("%s-%s", conf.Options.Id, "checkpoint")
	}

	if conf.Options.TargetType == utils.TargetTypeAliyunDynamoProxy &&
		(!conf.Options.IncreaseExecutorUpsert || !conf.Options.IncreaseExecutorInsertOnDupUpdate) {
		return fmt.Errorf("increase.executor.upsert and increase.executor.insert_on_dup_update should be "+
			"enable when target type is %v", utils.TargetTypeAliyunDynamoProxy)
	}

	if conf.Options.SourceEndpointUrl != "" && conf.Options.SyncMode != "full" {
		return fmt.Errorf("only support sync.mode=full when source.endpoint_url is set")
	}

	return nil
}