in plugins/pulsar/pulsar/send_async_producer.go [35:97]
func (s *SendAsyncInterceptor) BeforeInvoke(invocation operator.Invocation) error {
nativeProducer := invocation.CallerInstance().(*nativepartitionProducer)
topic := nativeProducer.options.Topic
msg := invocation.Args()[1].(*ProducerMessage)
lookup, err := nativeProducer.client.lookupService.Lookup(topic)
if err != nil {
return err
}
peer := lookup.PhysicalAddr.String()
operationName := pulsarAsyncPrefix + topic + pulsarAsyncSuffix
span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error {
if msg.Properties == nil {
msg.Properties = map[string]string{
headerKey: headerValue,
}
return nil
}
msg.Properties[headerKey] = headerValue
return nil
},
tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithComponent(pulsarAsyncComponentID),
tracing.WithTag(tracing.TagMQBroker, lookup.PhysicalAddr.String()),
tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic),
)
if err != nil {
return err
}
continueSnapShot := tracing.CaptureContext()
zuper := invocation.Args()[2].(func(id MessageID, message *ProducerMessage, err error))
callbackFunc := func(id MessageID, message *ProducerMessage, err error) {
defer tracing.CleanContext()
tracing.ContinueContext(continueSnapShot)
operationName = pulsarAsyncPrefix + topic + pulsarCallbackSuffix
localSpan, localErr := tracing.CreateLocalSpan(operationName,
tracing.WithComponent(pulsarAsyncComponentID),
tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic),
)
if localErr != nil {
zuper(id, message, err)
return
}
if err != nil {
span.Error(err.Error())
}
localSpan.Tag(tracing.TagMQBroker, lookup.PhysicalAddr.String())
localSpan.Tag(tracing.TagMQMsgID, id.String())
zuper(id, message, err)
localSpan.SetPeer(peer)
localSpan.End()
}
span.SetPeer(peer)
invocation.ChangeArg(2, callbackFunc)
invocation.SetContext(span)
return nil
}