func()

in plugins/rocketmq/producer/async_producer.go [43:100]


func (sa *SendASyncInterceptor) BeforeInvoke(invocation operator.Invocation) error {
	defaultProducer := invocation.CallerInstance().(*nativedefaultProducer)
	peer := strings.Join(defaultProducer.client.GetNameSrv().AddrList(), aSyncSemicolon)
	msgList := invocation.Args()[2].([]*primitive.Message)
	topic := msgList[0].Topic
	operationName := rmqASyncSendPrefix + topic + rmqASyncSuffix

	span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error {
		for _, message := range msgList {
			message.WithProperty(headerKey, headerValue)
		}
		return nil
	},
		tracing.WithLayer(tracing.SpanLayerMQ),
		tracing.WithComponent(rmqASyncComponentID),
		tracing.WithTag(tracing.TagMQTopic, topic),
	)
	if err != nil {
		return err
	}

	continueSnapShot := tracing.CaptureContext()
	zuper := invocation.Args()[1].(func(ctx context.Context, result *primitive.SendResult, err error))
	// enhance async callback method
	callbackFunc := func(ctx context.Context, sendResult *primitive.SendResult, err error) {
		defer tracing.CleanContext()
		tracing.ContinueContext(continueSnapShot)
		operationName = rmqASyncSendPrefix + topic + rmqCallbackSuffix

		localSpan, localErr := tracing.CreateLocalSpan(operationName,
			tracing.WithComponent(rmqASyncComponentID),
			tracing.WithLayer(tracing.SpanLayerMQ),
			tracing.WithTag(tracing.TagMQTopic, topic),
		)
		if localErr != nil {
			zuper(ctx, sendResult, err)
			return
		}
		if err != nil {
			span.Error(err.Error())
		}
		localSpan.Tag(tracing.TagMQStatus, SendStatusStr(sendResult.Status))
		localSpan.Tag(tracing.TagMQQueue, fmt.Sprintf("%d", sendResult.MessageQueue.QueueId))
		localSpan.Tag(tracing.TagMQBroker, defaultProducer.client.GetNameSrv().
			FindBrokerAddrByName(sendResult.MessageQueue.BrokerName))
		localSpan.Tag(tracing.TagMQMsgID, sendResult.MsgID)
		localSpan.Tag(aSyncTagMQOffsetMsgID, sendResult.OffsetMsgID)

		zuper(ctx, sendResult, err)
		localSpan.SetPeer(peer)
		localSpan.End()
	}

	span.SetPeer(peer)
	invocation.ChangeArg(1, callbackFunc)
	invocation.SetContext(span)
	return nil
}