plugin/connector/rocketmq/client/rocketmq_consumer.go (103 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"errors"
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/constants"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"strconv"
"strings"
)
type SubscribeFunc func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)
// RocketMQConsumer RocketMQ consumer interface
type RocketMQConsumer interface {
Start() error
Shutdown() error
Subscribe(topic string, selector consumer.MessageSelector, f SubscribeFunc) error
Unsubscribe(topic string) error
Suspend()
Resume()
IsBroadCasting() bool
}
// RocketMQConsumerWrapper consumer wrapper of RocketMQ client
type RocketMQConsumerWrapper struct {
options []consumer.Option
consumer rocketmq.PushConsumer
messageModel consumer.MessageModel
}
// NewRocketMQConsumerWrapper get a consumer wrapper of RocketMQ client
func NewRocketMQConsumerWrapper(properties map[string]string) (RocketMQConsumer, error) {
consumerWrapper := &RocketMQConsumerWrapper{messageModel: consumer.Clustering}
options, err := consumerWrapper.getConsumerOptionsFromProperties(properties)
if err != nil {
return nil, err
}
rocketMQConsumer, err := rocketmq.NewPushConsumer(options...)
if err != nil {
return nil, err
}
consumerWrapper.consumer = rocketMQConsumer
return consumerWrapper, nil
}
// Start wrapper start function
func (r *RocketMQConsumerWrapper) Start() error {
return r.consumer.Start()
}
// Shutdown wrapper shutdown function
func (r *RocketMQConsumerWrapper) Shutdown() error {
return r.consumer.Shutdown()
}
// Subscribe wrapper subscribe function
func (r *RocketMQConsumerWrapper) Subscribe(topic string, selector consumer.MessageSelector, f SubscribeFunc) error {
return r.consumer.Subscribe(topic, selector, f)
}
// Unsubscribe wrapper unsubscribe function
func (r *RocketMQConsumerWrapper) Unsubscribe(topic string) error {
return r.consumer.Unsubscribe(topic)
}
// Suspend wrapper suspend function
func (r *RocketMQConsumerWrapper) Suspend() {
r.consumer.Suspend()
}
// Resume wrapper resume function
func (r *RocketMQConsumerWrapper) Resume() {
r.consumer.Resume()
}
// IsBroadCasting check if consumer mode is broadcasting
func (r *RocketMQConsumerWrapper) IsBroadCasting() bool {
return r.messageModel == consumer.BroadCasting
}
// getConsumerOptionsFromProperties convert properties map to client options
func (r *RocketMQConsumerWrapper) getConsumerOptionsFromProperties(properties map[string]string) ([]consumer.Option, error) {
clientConfig, err := getClientConfigFromProperties(properties)
if clientConfig == nil {
return nil, err
}
options := make([]consumer.Option, 0)
accessPoints := clientConfig.AccessPoints
if len(accessPoints) == 0 {
return nil, errors.New("fail to parse rocketmq consumer config, invalid access points")
}
// name server address
options = append(options, consumer.WithNameServer(strings.Split(accessPoints, ",")))
// max reconsume times
if len(clientConfig.MaxReconsumeTimes) != 0 {
maxReconsumeTimes, err := strconv.ParseInt(clientConfig.MaxReconsumeTimes, 10, 32)
if err == nil {
options = append(options, consumer.WithMaxReconsumeTimes(int32(maxReconsumeTimes)))
}
}
// consume message model
isBroadCasting := false
if len(clientConfig.MessageModel) != 0 {
switch clientConfig.MessageModel {
case consumer.BroadCasting.String():
options = append(options, consumer.WithConsumerModel(consumer.BroadCasting))
isBroadCasting = true
r.messageModel = consumer.BroadCasting
default:
options = append(options, consumer.WithConsumerModel(consumer.Clustering))
r.messageModel = consumer.Clustering
}
}
// consumer group
if len(clientConfig.ConsumerGroup) == 0 {
return nil, errors.New("fail to create rocketmq consumer, consumer group is empty")
}
consumerGroup := clientConfig.ConsumerGroup
if isBroadCasting {
consumerGroup = fmt.Sprintf("%s-%s", constants.ConsumerGroupBroadcastPrefix, consumerGroup)
}
options = append(options, consumer.WithGroupName(clientConfig.ConsumerGroup))
// TODO consumeTimeout config, currently rocket mq go client doesn't support
// instance name
if len(clientConfig.InstanceName) != 0 {
options = append(options, consumer.WithInstance(clientConfig.InstanceName))
}
return options, nil
}