func()

in consumer/push_consumer.go [1024:1066]


func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {
	if len(subMsgs) == 0 {
		return ConsumeRetryLater, errors.New("msg list empty")
	}

	f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)

	// fix lost retry message
	if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {
		f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
	}

	if !exist {
		return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
	}

	callback, ok := f.(*PushConsumerCallback)
	if !ok {
		return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
	}
	if pc.interceptor == nil {
		return callback.f(ctx, subMsgs...)
	} else {
		var container ConsumeResultHolder
		err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
			msgs := req.([]*primitive.MessageExt)
			r, e := callback.f(ctx, msgs...)

			realReply := reply.(*ConsumeResultHolder)
			realReply.ConsumeResult = r

			msgCtx, _ := primitive.GetConsumerCtx(ctx)
			msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
			if realReply.ConsumeResult == ConsumeSuccess {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
			} else {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
			}
			return e
		})
		return container.ConsumeResult, err
	}
}