private void appendAck()

in broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java [186:309]


    private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck,
        final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException {
        String[] extraInfo;
        String consumeGroup, topic;
        int qId, rqId;
        long startOffset, ackOffset;
        long popTime, invisibleTime;
        AckMsg ackMsg;
        int ackCount = 0;
        if (batchAck == null) {
            // single ack
            extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
            brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
            consumeGroup = requestHeader.getConsumerGroup();
            topic = requestHeader.getTopic();
            qId = requestHeader.getQueueId();
            rqId = ExtraInfoUtil.getReviveQid(extraInfo);
            startOffset = ExtraInfoUtil.getCkQueueOffset(extraInfo);
            ackOffset = requestHeader.getOffset();
            popTime = ExtraInfoUtil.getPopTime(extraInfo);
            invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);

            if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
                ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
                return;
            }

            ackMsg = new AckMsg();
            ackCount = 1;
        } else {
            // batch ack
            consumeGroup = batchAck.getConsumerGroup();
            topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
            qId = batchAck.getQueueId();
            rqId = batchAck.getReviveQueueId();
            startOffset = batchAck.getStartOffset();
            ackOffset = -1;
            popTime = batchAck.getPopTime();
            invisibleTime = batchAck.getInvisibleTime();

            long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
            long maxOffset;
            try {
                maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, qId);
            } catch (ConsumeQueueException e) {
                throw new RemotingCommandException("Failed to get max offset in queue", e);
            }
            if (minOffset == -1 || maxOffset == -1) {
                POP_LOGGER.error("Illegal topic or queue found when batch ack {}", batchAck);
                return;
            }

            BatchAckMsg batchAckMsg = new BatchAckMsg();
            BitSet bitSet = batchAck.getBitSet();
            for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
                if (i == Integer.MAX_VALUE) {
                    break;
                }
                long offset = startOffset + i;
                if (offset < minOffset || offset > maxOffset) {
                    continue;
                }
                if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
                    ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response);
                } else {
                    batchAckMsg.getAckOffsetList().add(offset);
                }
            }
            if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
                return;
            }

            ackMsg = batchAckMsg;
            ackCount = batchAckMsg.getAckOffsetList().size();
        }

        this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
        this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount);

        ackMsg.setConsumerGroup(consumeGroup);
        ackMsg.setTopic(topic);
        ackMsg.setQueueId(qId);
        ackMsg.setStartOffset(startOffset);
        ackMsg.setAckOffset(ackOffset);
        ackMsg.setPopTime(popTime);
        ackMsg.setBrokerName(brokerName);

        if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
            brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
            return;
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(reviveTopic);
        msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8));
        msgInner.setQueueId(rqId);
        if (ackMsg instanceof BatchAckMsg) {
            msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
            msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genBatchAckUniqueId((BatchAckMsg) ackMsg));
        } else {
            msgInner.setTags(PopAckConstants.ACK_TAG);
            msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
        }
        msgInner.setBornTimestamp(System.currentTimeMillis());
        msgInner.setBornHost(this.brokerController.getStoreHost());
        msgInner.setStoreHost(this.brokerController.getStoreHost());
        msgInner.setDeliverTimeMs(popTime + invisibleTime);
        msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        if (brokerController.getBrokerConfig().isAppendAckAsync()) {
            int finalAckCount = ackCount;
            this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
                handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
            }).exceptionally(throwable -> {
                handlePutMessageResult(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, false),
                    ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
                POP_LOGGER.error("put ack msg error ", throwable);
                return null;
            });
        } else {
            PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
            handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, ackCount);
        }
    }