in pulsar/consumer_partition.go [535:588]
func (pc *partitionConsumer) internalAckWithTxn(req *ackWithTxnRequest) {
defer close(req.doneCh)
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
req.err = newError(ConsumerClosed, "Failed to ack by closing or closed consumer")
return
}
if req.Transaction.state.Load() != int32(TxnOpen) {
pc.log.WithField("state", req.Transaction.state.Load()).Error("Failed to ack by a non-open transaction.")
req.err = newError(InvalidStatus, "Failed to ack by a non-open transaction.")
return
}
msgID := req.msgID
messageIDs := make([]*pb.MessageIdData, 1)
messageIDs[0] = &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
EntryId: proto.Uint64(uint64(msgID.entryID)),
}
if pc.options.enableBatchIndexAck && msgID.tracker != nil {
ackSet := msgID.tracker.toAckSet()
if ackSet != nil {
messageIDs[0].AckSet = ackSet
}
}
reqID := pc.client.rpcClient.NewRequestID()
txnID := req.Transaction.GetTxnID()
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
AckType: pb.CommandAck_Individual.Enum(),
TxnidMostBits: proto.Uint64(txnID.MostSigBits),
TxnidLeastBits: proto.Uint64(txnID.LeastSigBits),
}
if err := req.Transaction.registerAckTopic(pc.options.topic, pc.options.subscription); err != nil {
req.err = err
return
}
if err := req.Transaction.registerSendOrAckOp(); err != nil {
req.err = err
return
}
cmdAck.RequestId = proto.Uint64(reqID)
_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.WithError(err).Error("Ack with response error")
}
req.Transaction.endSendOrAckOp(err)
req.err = err
}