plugin/connector/rocketmq/consumer.go (161 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rocketmq
import (
"context"
"errors"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/client"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/constants"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/convert"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
ce "github.com/cloudevents/sdk-go/v2"
"strconv"
"sync"
)
type Consumer struct {
rocketMQConsumer client.RocketMQConsumer
started bool
mutex sync.Mutex
listener *connector.EventListener
subscribeHandler SubscribeHandler
}
// NewConsumer get new consumer, needs to be Initiated before using
func NewConsumer() *Consumer {
return &Consumer{}
}
// InitConsumer init consumer by properties
func (c *Consumer) InitConsumer(properties map[string]string) error {
consumer, err := client.NewRocketMQConsumerWrapper(properties)
if err != nil {
return err
}
c.rocketMQConsumer = consumer
if c.rocketMQConsumer.IsBroadCasting() {
c.subscribeHandler = &BroadCastingMessageSubscribeHandler{consumer: c}
} else {
c.subscribeHandler = &ClusteringMessageSubscribeHandler{consumer: c}
}
return nil
}
// IsStarted check if consumer is started
func (c *Consumer) IsStarted() bool {
return c.rocketMQConsumer != nil && c.started
}
// IsClosed check if consumer is closed
func (c *Consumer) IsClosed() bool {
return c.rocketMQConsumer != nil && !c.started
}
// Start make consumer started
func (c *Consumer) Start() error {
if c.rocketMQConsumer == nil {
return errors.New("start rocketmq consumer fail, producer should be initiated first")
}
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.started {
err := c.rocketMQConsumer.Start()
if err != nil {
return err
}
c.started = true
}
return nil
}
// Shutdown terminate the consumer
func (c *Consumer) Shutdown() error {
if c.rocketMQConsumer == nil {
return errors.New("shutdown rocketmq consumer fail, producer should be initiated first")
}
c.mutex.Lock()
defer c.mutex.Unlock()
if c.started {
err := c.rocketMQConsumer.Shutdown()
if err != nil {
return err
}
c.started = false
}
return nil
}
// UpdateOffset always return error, since currently RocketMQ client doesn't support manual offset updating
func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error {
// TODO support offset update
return errors.New("fail to update offset, currently RocketMQ client doesn't support manual offset updating")
}
// Subscribe subscribe topic
func (c *Consumer) Subscribe(topicName string) error {
return c.rocketMQConsumer.Subscribe(topicName, consumer.MessageSelector{}, c.subscribeHandler.handle)
}
// Unsubscribe unsubscribe topic
func (c *Consumer) Unsubscribe(topicName string) error {
return c.rocketMQConsumer.Unsubscribe(topicName)
}
// RegisterEventListener listener's Consume function will be called when message is being consumed
func (c *Consumer) RegisterEventListener(listener *connector.EventListener) {
c.listener = listener
}
// SubscribeHandler interface of message consume handler
type SubscribeHandler interface {
handle(ctx context.Context, msg ...*primitive.MessageExt) (consumer.ConsumeResult, error)
getMessageModel() consumer.MessageModel
}
// ClusteringMessageSubscribeHandler message consume handler of Clustering mode
type ClusteringMessageSubscribeHandler struct {
consumer *Consumer
}
func (h *ClusteringMessageSubscribeHandler) getMessageModel() consumer.MessageModel {
return consumer.Clustering
}
func (h *ClusteringMessageSubscribeHandler) handle(ctx context.Context, msg ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
if len(msg) == 0 {
return consumer.ConsumeSuccess, nil
}
messageExt := msg[0]
messageExt.WithProperty(constants.PropertyMessageBornTimestamp,
strconv.FormatInt(messageExt.BornTimestamp, 10))
messageExt.WithProperty(constants.PropertyMessageStoreTimestamp,
strconv.FormatInt(messageExt.StoreTimestamp, 10))
message := &messageExt.Message
convert.TransferMessageSystemProperties(message)
event, err := convert.NewRocketMQMessageReader(message).ToCloudEvent(context.Background())
if err != nil {
return consumer.ConsumeSuccess, nil
}
consumeResult := consumer.ConsumeSuccess
commitFunction := func(action connector.EventMeshAction) error {
switch action {
case connector.CommitMessage:
consumeResult = consumer.ConsumeSuccess
case connector.ReconsumeLater:
consumeResult = consumer.ConsumeRetryLater
case connector.ManualAck:
// currently, RocketMQ go client doesn't support manual offset updating, so just commit message here
// TODO support manual offset updating
consumeResult = consumer.ConsumeSuccess
}
return nil
}
h.consumer.listener.Consume(event, commitFunction)
return consumeResult, nil
}
// BroadCastingMessageSubscribeHandler message consume handler of BroadCasting mode
type BroadCastingMessageSubscribeHandler struct {
consumer *Consumer
}
func (h *BroadCastingMessageSubscribeHandler) getMessageModel() consumer.MessageModel {
return consumer.BroadCasting
}
func (h *BroadCastingMessageSubscribeHandler) handle(ctx context.Context, msg ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
if len(msg) == 0 {
return consumer.ConsumeSuccess, nil
}
messageExt := msg[0]
messageExt.WithProperty(constants.PropertyMessageBornTimestamp,
strconv.FormatInt(messageExt.BornTimestamp, 10))
messageExt.WithProperty(constants.PropertyMessageStoreTimestamp,
strconv.FormatInt(messageExt.StoreTimestamp, 10))
message := &messageExt.Message
convert.TransferMessageSystemProperties(message)
event, err := convert.NewRocketMQMessageReader(message).ToCloudEvent(context.Background())
if err != nil {
return consumer.ConsumeSuccess, nil
}
consumeResult := consumer.ConsumeSuccess
commitFunction := func(action connector.EventMeshAction) error {
switch action {
case connector.CommitMessage:
consumeResult = consumer.ConsumeSuccess
case connector.ReconsumeLater:
consumeResult = consumer.ConsumeRetryLater
case connector.ManualAck:
// currently, RocketMQ go client doesn't support manual offset updating, so just commit message here
// TODO support manual offset updating
consumeResult = consumer.ConsumeSuccess
}
return nil
}
h.consumer.listener.Consume(event, commitFunction)
return consumeResult, nil
}