in consumer/interceptor.go [42:115]
func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Interceptor {
if dispatcher != nil {
dispatcher.Start()
}
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
if dispatcher == nil {
return fmt.Errorf("GetOrNewRocketMQClient faild")
}
consumerCtx, exist := primitive.GetConsumerCtx(ctx)
if !exist || len(consumerCtx.Msgs) == 0 {
return next(ctx, req, reply)
}
beginT := time.Now()
// before traceCtx
traceCx := internal.TraceContext{
RequestId: primitive.CreateUniqID(),
TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
TraceType: internal.SubBefore,
GroupName: consumerCtx.ConsumerGroup,
IsSuccess: true,
}
beans := make([]internal.TraceBean, 0)
for _, msg := range consumerCtx.Msgs {
if msg == nil {
continue
}
regionID := msg.GetRegionID()
traceOn := msg.IsTraceOn()
if traceOn == "false" {
continue
}
bean := internal.TraceBean{
Topic: msg.Topic,
MsgId: msg.MsgId,
Tags: msg.GetTags(),
Keys: msg.GetKeys(),
StoreTime: msg.StoreTimestamp,
BodyLength: int(msg.StoreSize),
RetryTimes: int(msg.ReconsumeTimes),
ClientHost: utils.LocalIP,
StoreHost: utils.LocalIP,
}
beans = append(beans, bean)
traceCx.RegionId = regionID
}
if len(beans) > 0 {
traceCx.TraceBeans = beans
traceCx.TimeStamp = time.Now().UnixNano() / int64(time.Millisecond)
dispatcher.Append(traceCx)
}
err := next(ctx, req, reply)
// after traceCtx
costTime := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)
ctxType := consumerCtx.Properties[primitive.PropCtxType]
afterCtx := internal.TraceContext{
TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
TraceType: internal.SubAfter,
RegionId: traceCx.RegionId,
GroupName: traceCx.GroupName,
RequestId: traceCx.RequestId,
IsSuccess: consumerCtx.Success,
CostTime: costTime,
TraceBeans: traceCx.TraceBeans,
ContextCode: primitive.ConsumeReturnType(ctxType).Ordinal(),
}
dispatcher.Append(afterCtx)
return err
}
}