plugin/connector/rocketmq/producer.go (183 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rocketmq
import (
"context"
"errors"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/client"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/constants"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/convert"
"github.com/apache/rocketmq-client-go/v2/primitive"
ce "github.com/cloudevents/sdk-go/v2"
"strings"
"sync"
"time"
)
type SendCallback func(ctx context.Context, result *primitive.SendResult, err error)
type Producer struct {
rocketMQProducer client.RocketMQProducer
started bool
mutex sync.Mutex
}
// NewProducer get new producer, needs to be Initiated before using
func NewProducer() *Producer {
return &Producer{}
}
// InitProducer init producer by properties
func (p *Producer) InitProducer(properties map[string]string) error {
producer, err := client.NewRocketMQProducerWrapper(properties)
if err != nil {
return err
}
p.rocketMQProducer = producer
return nil
}
// IsStarted check if producer is started
func (p *Producer) IsStarted() bool {
return p.rocketMQProducer != nil && p.started
}
// IsClosed check if producer is closed
func (p *Producer) IsClosed() bool {
return p.rocketMQProducer != nil && !p.started
}
// Start make producer started
func (p *Producer) Start() error {
if p.rocketMQProducer == nil {
return errors.New("start rocketmq producer fail, producer should be initiated first")
}
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
err := p.rocketMQProducer.Start()
if err != nil {
return err
}
p.started = true
}
return nil
}
// Shutdown terminate the producer
func (p *Producer) Shutdown() error {
if p.rocketMQProducer == nil {
return errors.New("shutdown rocketmq producer fail, producer should be initiated first")
}
p.mutex.Lock()
defer p.mutex.Unlock()
if p.started {
err := p.rocketMQProducer.Shutdown()
if err != nil {
return err
}
p.started = false
}
return nil
}
// Publish async publish message to broker
func (p *Producer) Publish(ctx context.Context, event *ce.Event, callback *connector.SendCallback) error {
if err := p.checkProducerStatus(); err != nil {
return err
}
msg, err := convert.NewRocketMQMessageWriter(event.Subject()).ToMessage(ctx, event)
if err != nil {
return err
}
p.supplySysProp(msg, event)
err = p.rocketMQProducer.SendAsync(ctx, p.sendCallbackConvert(callback), msg)
if err != nil {
return err
}
return nil
}
// SendOneway async send message without callback
func (p *Producer) SendOneway(ctx context.Context, event *ce.Event) error {
if err := p.checkProducerStatus(); err != nil {
return err
}
msg, err := convert.NewRocketMQMessageWriter(event.Subject()).ToMessage(ctx, event)
if err != nil {
return err
}
p.supplySysProp(msg, event)
err = p.rocketMQProducer.SendOneWay(ctx, msg)
if err != nil {
return err
}
return nil
}
// Request async request message
func (p *Producer) Request(ctx context.Context, event *ce.Event, callback *connector.RequestReplyCallback,
timeout time.Duration) error {
if err := p.checkProducerStatus(); err != nil {
return err
}
msg, err := convert.NewRocketMQMessageWriter(event.Subject()).ToMessage(ctx, event)
if err != nil {
return err
}
p.supplySysProp(msg, event)
return p.rocketMQProducer.RequestAsync(ctx, timeout, p.requestReplyCallbackConvert(event.Subject(), callback), msg)
}
// Reply async send message to reply
func (p *Producer) Reply(ctx context.Context, event *ce.Event, callback *connector.SendCallback) error {
if err := p.checkProducerStatus(); err != nil {
return err
}
msg, err := convert.NewRocketMQMessageWriter(event.Subject()).ToMessage(ctx, event)
if err != nil {
return err
}
p.supplySysProp(msg, event)
err = p.rocketMQProducer.SendAsync(ctx, p.sendCallbackConvert(callback))
if err != nil {
return err
}
return nil
}
// CheckTopicExist RocketMQ go-sdk doesn't support topic check
func (p *Producer) CheckTopicExist(topicName string) (bool, error) {
// RocketMQ go-sdk doesn't support topic check
return false, errors.New("rocketmq producer doesn't support topic check")
}
// SetExtFields do nothing, RocketMQ go-sdk doesn't support dynamic client option modify
func (p *Producer) SetExtFields() error {
return nil
}
// checkProducerStatus check is the producer has started
func (p *Producer) checkProducerStatus() error {
if p.rocketMQProducer == nil {
return errors.New("rocketmq producer has not been initiated")
}
if !p.started {
return errors.New("rocketmq producer has not started")
}
return nil
}
// supplySysProp convert producer client's system properties format
func (p *Producer) supplySysProp(message *primitive.Message, cloudEvent *ce.Event) {
for _, propertyKey := range constants.RocketMQMessageProperties.ToSlice() {
key := strings.ReplaceAll(strings.ToLower(propertyKey), "_", constants.MessagePropertySeparator)
if val, ok := cloudEvent.Extensions()[key]; ok {
message.WithProperty(propertyKey, val.(string))
message.RemoveProperty(key)
}
}
}
// sendCallbackConvert convert connector API callback to RocketMQ callback
func (p *Producer) sendCallbackConvert(callback *connector.SendCallback) SendCallback {
return func(ctx context.Context, result *primitive.SendResult, err error) {
if err != nil {
callback.OnError(&connector.ErrorResult{
Topic: result.MessageQueue.Topic,
Err: err,
})
}
callback.OnSuccess(p.convertToSendResult(result))
}
}
// requestReplyCallbackConvert convert connector API callback to RocketMQ callback
func (p *Producer) requestReplyCallbackConvert(topic string,
callback *connector.RequestReplyCallback) func(ctx context.Context, msg *primitive.Message, err error) {
return func(ctx context.Context, msg *primitive.Message, err error) {
if err != nil {
callback.OnError(&connector.ErrorResult{
Topic: topic,
Err: err,
})
return
}
convert.TransferMessageSystemProperties(msg)
event, err := convert.NewRocketMQMessageReader(msg).ToCloudEvent(ctx)
if err != nil {
panic(err)
}
callback.OnSuccess(event)
}
}
func (p *Producer) convertToSendResult(result *primitive.SendResult) *connector.SendResult {
return &connector.SendResult{
MessageId: result.MsgID,
Topic: result.MessageQueue.Topic,
}
}