plugins/rocketmq/producer/async_producer.go (82 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package producer
import (
"context"
"fmt"
"strings"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/skywalking-go/plugins/core/operator"
"github.com/apache/skywalking-go/plugins/core/tracing"
)
const (
rmqASyncSendPrefix = "RocketMQ/"
rmqASyncSuffix = "/AsyncProducer"
rmqCallbackSuffix = "/Producer/Callback"
rmqASyncComponentID = 38
aSyncSemicolon = ";"
aSyncTagMQOffsetMsgID = "mq.offset.msg.id"
)
type SendASyncInterceptor struct {
}
func (sa *SendASyncInterceptor) BeforeInvoke(invocation operator.Invocation) error {
defaultProducer := invocation.CallerInstance().(*nativedefaultProducer)
peer := strings.Join(defaultProducer.client.GetNameSrv().AddrList(), aSyncSemicolon)
msgList := invocation.Args()[2].([]*primitive.Message)
topic := msgList[0].Topic
operationName := rmqASyncSendPrefix + topic + rmqASyncSuffix
span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error {
for _, message := range msgList {
message.WithProperty(headerKey, headerValue)
}
return nil
},
tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithComponent(rmqASyncComponentID),
tracing.WithTag(tracing.TagMQTopic, topic),
)
if err != nil {
return err
}
continueSnapShot := tracing.CaptureContext()
zuper := invocation.Args()[1].(func(ctx context.Context, result *primitive.SendResult, err error))
// enhance async callback method
callbackFunc := func(ctx context.Context, sendResult *primitive.SendResult, err error) {
defer tracing.CleanContext()
tracing.ContinueContext(continueSnapShot)
operationName = rmqASyncSendPrefix + topic + rmqCallbackSuffix
localSpan, localErr := tracing.CreateLocalSpan(operationName,
tracing.WithComponent(rmqASyncComponentID),
tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithTag(tracing.TagMQTopic, topic),
)
if localErr != nil {
zuper(ctx, sendResult, err)
return
}
if err != nil {
span.Error(err.Error())
}
localSpan.Tag(tracing.TagMQStatus, SendStatusStr(sendResult.Status))
localSpan.Tag(tracing.TagMQQueue, fmt.Sprintf("%d", sendResult.MessageQueue.QueueId))
localSpan.Tag(tracing.TagMQBroker, defaultProducer.client.GetNameSrv().
FindBrokerAddrByName(sendResult.MessageQueue.BrokerName))
localSpan.Tag(tracing.TagMQMsgID, sendResult.MsgID)
localSpan.Tag(aSyncTagMQOffsetMsgID, sendResult.OffsetMsgID)
zuper(ctx, sendResult, err)
localSpan.SetPeer(peer)
localSpan.End()
}
span.SetPeer(peer)
invocation.ChangeArg(1, callbackFunc)
invocation.SetContext(span)
return nil
}
func (sa *SendASyncInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error {
if invocation.GetContext() == nil {
return nil
}
span := invocation.GetContext().(tracing.Span)
if err, ok := result[0].(error); ok && err != nil {
span.Error(err.Error())
}
span.End()
return nil
}