in plugin/connector/rocketmq/client/rocketmq_consumer.go [104:159]
func (r *RocketMQConsumerWrapper) getConsumerOptionsFromProperties(properties map[string]string) ([]consumer.Option, error) {
clientConfig, err := getClientConfigFromProperties(properties)
if clientConfig == nil {
return nil, err
}
options := make([]consumer.Option, 0)
accessPoints := clientConfig.AccessPoints
if len(accessPoints) == 0 {
return nil, errors.New("fail to parse rocketmq consumer config, invalid access points")
}
// name server address
options = append(options, consumer.WithNameServer(strings.Split(accessPoints, ",")))
// max reconsume times
if len(clientConfig.MaxReconsumeTimes) != 0 {
maxReconsumeTimes, err := strconv.ParseInt(clientConfig.MaxReconsumeTimes, 10, 32)
if err == nil {
options = append(options, consumer.WithMaxReconsumeTimes(int32(maxReconsumeTimes)))
}
}
// consume message model
isBroadCasting := false
if len(clientConfig.MessageModel) != 0 {
switch clientConfig.MessageModel {
case consumer.BroadCasting.String():
options = append(options, consumer.WithConsumerModel(consumer.BroadCasting))
isBroadCasting = true
r.messageModel = consumer.BroadCasting
default:
options = append(options, consumer.WithConsumerModel(consumer.Clustering))
r.messageModel = consumer.Clustering
}
}
// consumer group
if len(clientConfig.ConsumerGroup) == 0 {
return nil, errors.New("fail to create rocketmq consumer, consumer group is empty")
}
consumerGroup := clientConfig.ConsumerGroup
if isBroadCasting {
consumerGroup = fmt.Sprintf("%s-%s", constants.ConsumerGroupBroadcastPrefix, consumerGroup)
}
options = append(options, consumer.WithGroupName(clientConfig.ConsumerGroup))
// TODO consumeTimeout config, currently rocket mq go client doesn't support
// instance name
if len(clientConfig.InstanceName) != 0 {
options = append(options, consumer.WithInstance(clientConfig.InstanceName))
}
return options, nil
}