in consumer/push_consumer.go [1024:1066]
func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {
if len(subMsgs) == 0 {
return ConsumeRetryLater, errors.New("msg list empty")
}
f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
// fix lost retry message
if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {
f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
}
if !exist {
return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
}
callback, ok := f.(*PushConsumerCallback)
if !ok {
return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
}
if pc.interceptor == nil {
return callback.f(ctx, subMsgs...)
} else {
var container ConsumeResultHolder
err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
msgs := req.([]*primitive.MessageExt)
r, e := callback.f(ctx, msgs...)
realReply := reply.(*ConsumeResultHolder)
realReply.ConsumeResult = r
msgCtx, _ := primitive.GetConsumerCtx(ctx)
msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
if realReply.ConsumeResult == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
return e
})
return container.ConsumeResult, err
}
}