func()

in plugin/connector/rocketmq/client/rocketmq_consumer.go [104:159]


func (r *RocketMQConsumerWrapper) getConsumerOptionsFromProperties(properties map[string]string) ([]consumer.Option, error) {
	clientConfig, err := getClientConfigFromProperties(properties)
	if clientConfig == nil {
		return nil, err
	}
	options := make([]consumer.Option, 0)

	accessPoints := clientConfig.AccessPoints
	if len(accessPoints) == 0 {
		return nil, errors.New("fail to parse rocketmq consumer config, invalid access points")
	}

	// name server address
	options = append(options, consumer.WithNameServer(strings.Split(accessPoints, ",")))

	// max reconsume times
	if len(clientConfig.MaxReconsumeTimes) != 0 {
		maxReconsumeTimes, err := strconv.ParseInt(clientConfig.MaxReconsumeTimes, 10, 32)
		if err == nil {
			options = append(options, consumer.WithMaxReconsumeTimes(int32(maxReconsumeTimes)))
		}
	}

	// consume message model
	isBroadCasting := false
	if len(clientConfig.MessageModel) != 0 {
		switch clientConfig.MessageModel {
		case consumer.BroadCasting.String():
			options = append(options, consumer.WithConsumerModel(consumer.BroadCasting))
			isBroadCasting = true
			r.messageModel = consumer.BroadCasting
		default:
			options = append(options, consumer.WithConsumerModel(consumer.Clustering))
			r.messageModel = consumer.Clustering
		}
	}

	// consumer group
	if len(clientConfig.ConsumerGroup) == 0 {
		return nil, errors.New("fail to create rocketmq consumer, consumer group is empty")
	}
	consumerGroup := clientConfig.ConsumerGroup
	if isBroadCasting {
		consumerGroup = fmt.Sprintf("%s-%s", constants.ConsumerGroupBroadcastPrefix, consumerGroup)
	}
	options = append(options, consumer.WithGroupName(clientConfig.ConsumerGroup))

	// TODO consumeTimeout config, currently rocket mq go client doesn't support

	// instance name
	if len(clientConfig.InstanceName) != 0 {
		options = append(options, consumer.WithInstance(clientConfig.InstanceName))
	}

	return options, nil
}