in plugins/rocketmq/producer/async_producer.go [43:100]
func (sa *SendASyncInterceptor) BeforeInvoke(invocation operator.Invocation) error {
defaultProducer := invocation.CallerInstance().(*nativedefaultProducer)
peer := strings.Join(defaultProducer.client.GetNameSrv().AddrList(), aSyncSemicolon)
msgList := invocation.Args()[2].([]*primitive.Message)
topic := msgList[0].Topic
operationName := rmqASyncSendPrefix + topic + rmqASyncSuffix
span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error {
for _, message := range msgList {
message.WithProperty(headerKey, headerValue)
}
return nil
},
tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithComponent(rmqASyncComponentID),
tracing.WithTag(tracing.TagMQTopic, topic),
)
if err != nil {
return err
}
continueSnapShot := tracing.CaptureContext()
zuper := invocation.Args()[1].(func(ctx context.Context, result *primitive.SendResult, err error))
// enhance async callback method
callbackFunc := func(ctx context.Context, sendResult *primitive.SendResult, err error) {
defer tracing.CleanContext()
tracing.ContinueContext(continueSnapShot)
operationName = rmqASyncSendPrefix + topic + rmqCallbackSuffix
localSpan, localErr := tracing.CreateLocalSpan(operationName,
tracing.WithComponent(rmqASyncComponentID),
tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithTag(tracing.TagMQTopic, topic),
)
if localErr != nil {
zuper(ctx, sendResult, err)
return
}
if err != nil {
span.Error(err.Error())
}
localSpan.Tag(tracing.TagMQStatus, SendStatusStr(sendResult.Status))
localSpan.Tag(tracing.TagMQQueue, fmt.Sprintf("%d", sendResult.MessageQueue.QueueId))
localSpan.Tag(tracing.TagMQBroker, defaultProducer.client.GetNameSrv().
FindBrokerAddrByName(sendResult.MessageQueue.BrokerName))
localSpan.Tag(tracing.TagMQMsgID, sendResult.MsgID)
localSpan.Tag(aSyncTagMQOffsetMsgID, sendResult.OffsetMsgID)
zuper(ctx, sendResult, err)
localSpan.SetPeer(peer)
localSpan.End()
}
span.SetPeer(peer)
invocation.ChangeArg(1, callbackFunc)
invocation.SetContext(span)
return nil
}