in consumer/push_consumer.go [543:627]
func (pc *pushConsumer) validate() error {
if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
return err
}
if pc.consumerGroup == internal.DefaultConsumerGroup {
// TODO FQA
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}
if len(pc.subscribedTopic) == 0 {
rlog.Warning("not subscribe any topic yet", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
return errors.New("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
}
}
if pc.option.PullThresholdForQueue.Load() < 1 || pc.option.PullThresholdForQueue.Load() > 65535 {
if pc.option.PullThresholdForQueue.Load() == 0 {
pc.option.PullThresholdForQueue.Store(1024)
} else {
return errors.New("option.PullThresholdForQueue out of range [1, 65535]")
}
}
if pc.option.PullThresholdForTopic < 1 || pc.option.PullThresholdForTopic > 6553500 {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
return errors.New("option.PullThresholdForTopic out of range [1, 6553500]")
}
}
if pc.option.PullThresholdSizeForQueue.Load() < 1 || pc.option.PullThresholdSizeForQueue.Load() > 1024 {
if pc.option.PullThresholdSizeForQueue.Load() == 0 {
pc.option.PullThresholdSizeForQueue.Store(512)
} else {
return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]")
}
}
if pc.option.PullThresholdSizeForTopic < 1 || pc.option.PullThresholdSizeForTopic > 102400 {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
return errors.New("option.PullThresholdSizeForTopic out of range [1, 102400]")
}
}
if interval := pc.option.PullInterval.Load(); interval < 0 || interval > 65535*time.Millisecond {
return errors.New("option.PullInterval out of range [0, 65535]")
}
if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 1
} else {
return errors.New("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
}
}
if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 || pullBatchSize > 1024 {
if pullBatchSize == 0 {
pc.option.PullBatchSize.Store(32)
} else {
return errors.New("option.PullBatchSize out of range [1, 1024]")
}
}
if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums > 100000 {
if pc.option.ConsumeGoroutineNums == 0 {
pc.option.ConsumeGoroutineNums = 20
} else {
return errors.New("option.ConsumeGoroutineNums out of range [1, 100000]")
}
}
return nil
}