in producer/interceptor.go [45:106]
func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Interceptor {
if dispatcher != nil {
dispatcher.Start()
}
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
if dispatcher == nil {
return fmt.Errorf("GetOrNewRocketMQClient faild")
}
beginT := time.Now()
producerCtx, ok := primitive.GetProducerCtx(ctx)
if !ok {
return fmt.Errorf("ProducerCtx Not Exist")
}
err := next(ctx, req, reply)
if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
return err
}
// SendOneway && SendAsync has no reply.
if reply == nil {
return err
}
result := reply.(*primitive.SendResult)
if result.RegionID == "" || !result.TraceOn {
return err
}
sendSuccess := result.Status == primitive.SendOK
costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)
storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2
traceBean := internal.TraceBean{
Topic: producerCtx.Message.Topic,
Tags: producerCtx.Message.GetTags(),
Keys: producerCtx.Message.GetKeys(),
StoreHost: producerCtx.BrokerAddr,
ClientHost: utils.LocalIP,
BodyLength: len(producerCtx.Message.Body),
MsgType: producerCtx.MsgType,
MsgId: result.MsgID,
OffsetMsgId: result.OffsetMsgID,
StoreTime: storeT,
}
traceCtx := internal.TraceContext{
RequestId: primitive.CreateUniqID(), // set id
TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
TraceType: internal.Pub,
GroupName: producerCtx.ProducerGroup,
RegionId: result.RegionID,
TraceBeans: []internal.TraceBean{traceBean},
CostTime: costT,
IsSuccess: sendSuccess,
}
dispatcher.Append(traceCtx)
return err
}
}