in broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java [186:309]
private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck,
final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException {
String[] extraInfo;
String consumeGroup, topic;
int qId, rqId;
long startOffset, ackOffset;
long popTime, invisibleTime;
AckMsg ackMsg;
int ackCount = 0;
if (batchAck == null) {
// single ack
extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
consumeGroup = requestHeader.getConsumerGroup();
topic = requestHeader.getTopic();
qId = requestHeader.getQueueId();
rqId = ExtraInfoUtil.getReviveQid(extraInfo);
startOffset = ExtraInfoUtil.getCkQueueOffset(extraInfo);
ackOffset = requestHeader.getOffset();
popTime = ExtraInfoUtil.getPopTime(extraInfo);
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}
ackMsg = new AckMsg();
ackCount = 1;
} else {
// batch ack
consumeGroup = batchAck.getConsumerGroup();
topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
qId = batchAck.getQueueId();
rqId = batchAck.getReviveQueueId();
startOffset = batchAck.getStartOffset();
ackOffset = -1;
popTime = batchAck.getPopTime();
invisibleTime = batchAck.getInvisibleTime();
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
long maxOffset;
try {
maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, qId);
} catch (ConsumeQueueException e) {
throw new RemotingCommandException("Failed to get max offset in queue", e);
}
if (minOffset == -1 || maxOffset == -1) {
POP_LOGGER.error("Illegal topic or queue found when batch ack {}", batchAck);
return;
}
BatchAckMsg batchAckMsg = new BatchAckMsg();
BitSet bitSet = batchAck.getBitSet();
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
if (i == Integer.MAX_VALUE) {
break;
}
long offset = startOffset + i;
if (offset < minOffset || offset > maxOffset) {
continue;
}
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response);
} else {
batchAckMsg.getAckOffsetList().add(offset);
}
}
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
ackMsg = batchAckMsg;
ackCount = batchAckMsg.getAckOffsetList().size();
}
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount);
ackMsg.setConsumerGroup(consumeGroup);
ackMsg.setTopic(topic);
ackMsg.setQueueId(qId);
ackMsg.setStartOffset(startOffset);
ackMsg.setAckOffset(ackOffset);
ackMsg.setPopTime(popTime);
ackMsg.setBrokerName(brokerName);
if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
return;
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8));
msgInner.setQueueId(rqId);
if (ackMsg instanceof BatchAckMsg) {
msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genBatchAckUniqueId((BatchAckMsg) ackMsg));
} else {
msgInner.setTags(PopAckConstants.ACK_TAG);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
}
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.brokerController.getStoreHost());
msgInner.setStoreHost(this.brokerController.getStoreHost());
msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
if (brokerController.getBrokerConfig().isAppendAckAsync()) {
int finalAckCount = ackCount;
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
}).exceptionally(throwable -> {
handlePutMessageResult(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, false),
ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
POP_LOGGER.error("put ack msg error ", throwable);
return null;
});
} else {
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, ackCount);
}
}