func()

in plugins/pulsar/pulsar/send_async_producer.go [35:97]


func (s *SendAsyncInterceptor) BeforeInvoke(invocation operator.Invocation) error {
	nativeProducer := invocation.CallerInstance().(*nativepartitionProducer)
	topic := nativeProducer.options.Topic
	msg := invocation.Args()[1].(*ProducerMessage)
	lookup, err := nativeProducer.client.lookupService.Lookup(topic)
	if err != nil {
		return err
	}
	peer := lookup.PhysicalAddr.String()
	operationName := pulsarAsyncPrefix + topic + pulsarAsyncSuffix

	span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error {
		if msg.Properties == nil {
			msg.Properties = map[string]string{
				headerKey: headerValue,
			}
			return nil
		}
		msg.Properties[headerKey] = headerValue
		return nil
	},
		tracing.WithLayer(tracing.SpanLayerMQ),
		tracing.WithComponent(pulsarAsyncComponentID),
		tracing.WithTag(tracing.TagMQBroker, lookup.PhysicalAddr.String()),
		tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic),
	)
	if err != nil {
		return err
	}

	continueSnapShot := tracing.CaptureContext()
	zuper := invocation.Args()[2].(func(id MessageID, message *ProducerMessage, err error))

	callbackFunc := func(id MessageID, message *ProducerMessage, err error) {
		defer tracing.CleanContext()
		tracing.ContinueContext(continueSnapShot)
		operationName = pulsarAsyncPrefix + topic + pulsarCallbackSuffix

		localSpan, localErr := tracing.CreateLocalSpan(operationName,
			tracing.WithComponent(pulsarAsyncComponentID),
			tracing.WithLayer(tracing.SpanLayerMQ),
			tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic),
		)
		if localErr != nil {
			zuper(id, message, err)
			return
		}
		if err != nil {
			span.Error(err.Error())
		}
		localSpan.Tag(tracing.TagMQBroker, lookup.PhysicalAddr.String())
		localSpan.Tag(tracing.TagMQMsgID, id.String())

		zuper(id, message, err)
		localSpan.SetPeer(peer)
		localSpan.End()
	}

	span.SetPeer(peer)
	invocation.ChangeArg(2, callbackFunc)
	invocation.SetContext(span)
	return nil
}