func()

in pulsar/consumer_partition.go [535:588]


func (pc *partitionConsumer) internalAckWithTxn(req *ackWithTxnRequest) {
	defer close(req.doneCh)
	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		req.err = newError(ConsumerClosed, "Failed to ack by closing or closed consumer")
		return
	}
	if req.Transaction.state.Load() != int32(TxnOpen) {
		pc.log.WithField("state", req.Transaction.state.Load()).Error("Failed to ack by a non-open transaction.")
		req.err = newError(InvalidStatus, "Failed to ack by a non-open transaction.")
		return
	}
	msgID := req.msgID

	messageIDs := make([]*pb.MessageIdData, 1)
	messageIDs[0] = &pb.MessageIdData{
		LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
		EntryId:  proto.Uint64(uint64(msgID.entryID)),
	}
	if pc.options.enableBatchIndexAck && msgID.tracker != nil {
		ackSet := msgID.tracker.toAckSet()
		if ackSet != nil {
			messageIDs[0].AckSet = ackSet
		}
	}

	reqID := pc.client.rpcClient.NewRequestID()
	txnID := req.Transaction.GetTxnID()
	cmdAck := &pb.CommandAck{
		ConsumerId:     proto.Uint64(pc.consumerID),
		MessageId:      messageIDs,
		AckType:        pb.CommandAck_Individual.Enum(),
		TxnidMostBits:  proto.Uint64(txnID.MostSigBits),
		TxnidLeastBits: proto.Uint64(txnID.LeastSigBits),
	}

	if err := req.Transaction.registerAckTopic(pc.options.topic, pc.options.subscription); err != nil {
		req.err = err
		return
	}

	if err := req.Transaction.registerSendOrAckOp(); err != nil {
		req.err = err
		return
	}

	cmdAck.RequestId = proto.Uint64(reqID)
	_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck)
	if err != nil {
		pc.log.WithError(err).Error("Ack with response error")
	}
	req.Transaction.endSendOrAckOp(err)
	req.err = err
}