func()

in pulsar/consumer_partition.go [1104:1152]


func (pc *partitionConsumer) internalAck(req *ackRequest) {
	defer close(req.doneCh)
	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		return
	}
	msgID := req.msgID

	messageIDs := make([]*pb.MessageIdData, 1)
	messageIDs[0] = &pb.MessageIdData{
		LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
		EntryId:  proto.Uint64(uint64(msgID.entryID)),
	}
	if pc.options.enableBatchIndexAck && msgID.tracker != nil {
		ackSet := msgID.tracker.toAckSet()
		if ackSet != nil {
			messageIDs[0].AckSet = ackSet
		}
	}

	reqID := pc.client.rpcClient.NewRequestID()
	cmdAck := &pb.CommandAck{
		ConsumerId: proto.Uint64(pc.consumerID),
		MessageId:  messageIDs,
	}

	switch req.ackType {
	case individualAck:
		cmdAck.AckType = pb.CommandAck_Individual.Enum()
	case cumulativeAck:
		cmdAck.AckType = pb.CommandAck_Cumulative.Enum()
	}

	if pc.options.ackWithResponse {
		cmdAck.RequestId = proto.Uint64(reqID)
		_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck)
		if err != nil {
			pc.log.WithError(err).Error("Ack with response error")
			req.err = err
		}
		return
	}

	err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
	if err != nil {
		pc.log.Error("Connection was closed when request ack cmd")
		req.err = err
	}
}