in consumer/pull_consumer.go [311:402]
func (pc *defaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, result ConsumeResult) {
if cr == nil {
return
}
pq := cr.processQueue
mq := cr.messageQueue
msgList := cr.msgList
if len(msgList) == 0 || pq == nil || mq == nil {
return
}
RETRY:
if pq.IsDroppd() {
rlog.Info("defaultPullConsumer the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
pc.resetRetryAndNamespace(msgList)
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgList,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
msgCtx.Success = true
} else {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
msgCtx.Success = false
}
if pc.interceptor != nil {
pc.interceptor(ctx, msgList, nil, func(ctx context.Context, req, reply interface{}) error {
return nil
})
}
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
msgBackSucceed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(msgList))
msgBackSucceed = msgList
} else {
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(msgList))
if pc.model == BroadCasting {
for i := 0; i < len(msgList); i++ {
rlog.Warning("defaultPullConsumer BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": msgList[i],
})
}
} else {
for i := 0; i < len(msgList); i++ {
msg := msgList[i]
if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msgBackSucceed = append(msgBackSucceed, msg)
} else {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
offset := pq.removeMessage(msgBackSucceed...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
msgList = msgBackFailed
time.Sleep(5 * time.Second)
goto RETRY
}
} else {
rlog.Warning("defaultPullConsumer processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
"message": msgList,
})
}
}