func()

in consumer/pull_consumer.go [311:402]


func (pc *defaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, result ConsumeResult) {
	if cr == nil {
		return
	}
	pq := cr.processQueue
	mq := cr.messageQueue
	msgList := cr.msgList
	if len(msgList) == 0 || pq == nil || mq == nil {
		return
	}
RETRY:
	if pq.IsDroppd() {
		rlog.Info("defaultPullConsumer the message queue not be able to consume, because it was dropped", map[string]interface{}{
			rlog.LogKeyMessageQueue:  mq.String(),
			rlog.LogKeyConsumerGroup: pc.consumerGroup,
		})
		return
	}

	pc.resetRetryAndNamespace(msgList)

	msgCtx := &primitive.ConsumeMessageContext{
		Properties:    make(map[string]string),
		ConsumerGroup: pc.consumerGroup,
		MQ:            mq,
		Msgs:          msgList,
	}
	ctx = primitive.WithConsumerCtx(ctx, msgCtx)
	ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
	concurrentCtx := primitive.NewConsumeConcurrentlyContext()
	concurrentCtx.MQ = *mq
	ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)

	if result == ConsumeSuccess {
		msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
		msgCtx.Success = true
	} else {
		msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
		msgCtx.Success = false
	}

	if pc.interceptor != nil {
		pc.interceptor(ctx, msgList, nil, func(ctx context.Context, req, reply interface{}) error {
			return nil
		})
	}

	if !pq.IsDroppd() {
		msgBackFailed := make([]*primitive.MessageExt, 0)
		msgBackSucceed := make([]*primitive.MessageExt, 0)
		if result == ConsumeSuccess {
			pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(msgList))
			msgBackSucceed = msgList
		} else {
			pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(msgList))
			if pc.model == BroadCasting {
				for i := 0; i < len(msgList); i++ {
					rlog.Warning("defaultPullConsumer BROADCASTING, the message consume failed, drop it", map[string]interface{}{
						"message": msgList[i],
					})
				}
			} else {
				for i := 0; i < len(msgList); i++ {
					msg := msgList[i]
					if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
						msgBackSucceed = append(msgBackSucceed, msg)
					} else {
						msg.ReconsumeTimes += 1
						msgBackFailed = append(msgBackFailed, msg)
					}
				}
			}
		}

		offset := pq.removeMessage(msgBackSucceed...)

		if offset >= 0 && !pq.IsDroppd() {
			pc.storage.update(mq, int64(offset), true)
		}
		if len(msgBackFailed) > 0 {
			msgList = msgBackFailed
			time.Sleep(5 * time.Second)
			goto RETRY
		}
	} else {
		rlog.Warning("defaultPullConsumer processQueue is dropped without process consume result.", map[string]interface{}{
			rlog.LogKeyMessageQueue: mq,
			"message":               msgList,
		})
	}

}