func newTraceInterceptor()

in consumer/interceptor.go [42:115]


func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Interceptor {
	if dispatcher != nil {
		dispatcher.Start()
	}

	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
		if dispatcher == nil {
			return fmt.Errorf("GetOrNewRocketMQClient faild")
		}
		consumerCtx, exist := primitive.GetConsumerCtx(ctx)
		if !exist || len(consumerCtx.Msgs) == 0 {
			return next(ctx, req, reply)
		}

		beginT := time.Now()
		// before traceCtx
		traceCx := internal.TraceContext{
			RequestId: primitive.CreateUniqID(),
			TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
			TraceType: internal.SubBefore,
			GroupName: consumerCtx.ConsumerGroup,
			IsSuccess: true,
		}
		beans := make([]internal.TraceBean, 0)
		for _, msg := range consumerCtx.Msgs {
			if msg == nil {
				continue
			}
			regionID := msg.GetRegionID()
			traceOn := msg.IsTraceOn()
			if traceOn == "false" {
				continue
			}
			bean := internal.TraceBean{
				Topic:      msg.Topic,
				MsgId:      msg.MsgId,
				Tags:       msg.GetTags(),
				Keys:       msg.GetKeys(),
				StoreTime:  msg.StoreTimestamp,
				BodyLength: int(msg.StoreSize),
				RetryTimes: int(msg.ReconsumeTimes),
				ClientHost: utils.LocalIP,
				StoreHost:  utils.LocalIP,
			}
			beans = append(beans, bean)
			traceCx.RegionId = regionID
		}
		if len(beans) > 0 {
			traceCx.TraceBeans = beans
			traceCx.TimeStamp = time.Now().UnixNano() / int64(time.Millisecond)
			dispatcher.Append(traceCx)
		}

		err := next(ctx, req, reply)

		// after traceCtx
		costTime := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)
		ctxType := consumerCtx.Properties[primitive.PropCtxType]
		afterCtx := internal.TraceContext{
			TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),

			TraceType:   internal.SubAfter,
			RegionId:    traceCx.RegionId,
			GroupName:   traceCx.GroupName,
			RequestId:   traceCx.RequestId,
			IsSuccess:   consumerCtx.Success,
			CostTime:    costTime,
			TraceBeans:  traceCx.TraceBeans,
			ContextCode: primitive.ConsumeReturnType(ctxType).Ordinal(),
		}
		dispatcher.Append(afterCtx)
		return err
	}
}