in pulsar/consumer_partition.go [1104:1152]
func (pc *partitionConsumer) internalAck(req *ackRequest) {
defer close(req.doneCh)
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return
}
msgID := req.msgID
messageIDs := make([]*pb.MessageIdData, 1)
messageIDs[0] = &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
EntryId: proto.Uint64(uint64(msgID.entryID)),
}
if pc.options.enableBatchIndexAck && msgID.tracker != nil {
ackSet := msgID.tracker.toAckSet()
if ackSet != nil {
messageIDs[0].AckSet = ackSet
}
}
reqID := pc.client.rpcClient.NewRequestID()
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
}
switch req.ackType {
case individualAck:
cmdAck.AckType = pb.CommandAck_Individual.Enum()
case cumulativeAck:
cmdAck.AckType = pb.CommandAck_Cumulative.Enum()
}
if pc.options.ackWithResponse {
cmdAck.RequestId = proto.Uint64(reqID)
_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.WithError(err).Error("Ack with response error")
req.err = err
}
return
}
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
req.err = err
}
}