plugin/connector/rocketmq/client/rocketmq_producer.go (114 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"encoding/json"
"errors"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/config"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/utils"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"strconv"
"strings"
"time"
)
// RocketMQProducer RocketMQ producer interface
type RocketMQProducer interface {
Start() error
Shutdown() error
SendSync(ctx context.Context, msg ...*primitive.Message) (*primitive.SendResult, error)
SendAsync(ctx context.Context, mq func(ctx context.Context, result *primitive.SendResult, err error),
msg ...*primitive.Message) error
SendOneWay(ctx context.Context, msg ...*primitive.Message) error
Request(ctx context.Context, ttl time.Duration, msg *primitive.Message) (*primitive.Message, error)
RequestAsync(ctx context.Context, ttl time.Duration, callback func(ctx context.Context,
msg *primitive.Message, err error), msg *primitive.Message) error
}
// RocketMQProducerWrapper producer wrapper of RocketMQ client
type RocketMQProducerWrapper struct {
options []producer.Option
producer rocketmq.Producer
}
// NewRocketMQProducerWrapper get a producer wrapper of RocketMQ client
func NewRocketMQProducerWrapper(properties map[string]string) (*RocketMQProducerWrapper, error) {
options, err := getOptionsProducerFromProperties(properties)
if err != nil {
return nil, err
}
rocketmqProducer, err := rocketmq.NewProducer(options...)
if err != nil {
return nil, err
}
return &RocketMQProducerWrapper{
options: options,
producer: rocketmqProducer,
}, nil
}
// Start wrapper start function
func (r *RocketMQProducerWrapper) Start() error {
return r.producer.Start()
}
// Shutdown wrapper shutdown function
func (r *RocketMQProducerWrapper) Shutdown() error {
return r.producer.Shutdown()
}
// SendSync wrapper send sync function
func (r *RocketMQProducerWrapper) SendSync(ctx context.Context, msg ...*primitive.Message) (*primitive.SendResult, error) {
return r.producer.SendSync(ctx, msg...)
}
// SendAsync wrapper send async function
func (r *RocketMQProducerWrapper) SendAsync(ctx context.Context, mq func(ctx context.Context,
result *primitive.SendResult, err error), msg ...*primitive.Message) error {
return r.producer.SendAsync(ctx, mq, msg...)
}
// SendOneWay wrapper send one way function
func (r *RocketMQProducerWrapper) SendOneWay(ctx context.Context, msg ...*primitive.Message) error {
return r.producer.SendOneWay(ctx, msg...)
}
// Request wrapper request function
func (r *RocketMQProducerWrapper) Request(ctx context.Context, ttl time.Duration,
msg *primitive.Message) (*primitive.Message, error) {
return r.producer.Request(ctx, ttl, msg)
}
// RequestAsync wrapper request async function
func (r *RocketMQProducerWrapper) RequestAsync(ctx context.Context, ttl time.Duration,
callback func(ctx context.Context, msg *primitive.Message, err error), msg *primitive.Message) error {
return r.producer.RequestAsync(ctx, ttl, callback, msg)
}
// getOptionsProducerFromProperties convert properties map to client options
func getOptionsProducerFromProperties(properties map[string]string) ([]producer.Option, error) {
clientConfig, err := getClientConfigFromProperties(properties)
if clientConfig == nil {
return nil, err
}
options := make([]producer.Option, 0)
accessPoints := clientConfig.AccessPoints
if len(accessPoints) == 0 {
return nil, errors.New("fail to parse rocketmq producer config, invalid access points")
}
// name server address
options = append(options, producer.WithNameServer(strings.Split(accessPoints, ",")))
// instance name
producerId := utils.GetInstanceName()
options = append(options, producer.WithInstanceName(producerId))
clientConfig.InstanceName = producerId
// producer group name
options = append(options, producer.WithGroupName(clientConfig.ProducerGroupName))
// send msg timeout
if len(clientConfig.SendMsgTimeout) != 0 {
sendMsgTimeout, err := strconv.Atoi(clientConfig.SendMsgTimeout)
if err == nil {
options = append(options, producer.WithSendMsgTimeout(time.Duration(sendMsgTimeout)*time.Millisecond))
}
}
// producer retry time
if len(clientConfig.ProducerRetryTimes) != 0 {
producerRetryTimes, err := strconv.Atoi(clientConfig.ProducerRetryTimes)
if err == nil {
options = append(options, producer.WithRetry(producerRetryTimes))
}
}
// producer compress message body threshold
if len(clientConfig.CompressMsgBodyThreshold) != 0 {
compressMsgBodyThreshold, err := strconv.Atoi(clientConfig.CompressMsgBodyThreshold)
if err == nil {
options = append(options, producer.WithCompressMsgBodyOverHowmuch(compressMsgBodyThreshold))
}
}
return options, nil
}
func getClientConfigFromProperties(properties map[string]string) (*config.ClientConfig, error) {
arr, err := json.Marshal(properties)
if err != nil {
return nil, err
}
clientConfig := &config.ClientConfig{}
err = json.Unmarshal(arr, clientConfig)
if err != nil {
return nil, err
}
return clientConfig, nil
}