in broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java [661:864]
private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId, boolean isRetry,
GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
String lockKey =
topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
boolean isOrder = requestHeader.isOrder();
long offset;
try {
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
false, lockKey, false);
} catch (ConsumeQueueException e) {
CompletableFuture<Long> failure = new CompletableFuture<>();
failure.completeExceptionally(e);
return failure;
}
CompletableFuture<Long> future = new CompletableFuture<>();
if (!queueLockManager.tryLock(lockKey)) {
try {
if (!requestHeader.isOrder()) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
}
future.complete(restNum);
} catch (ConsumeQueueException e) {
future.completeExceptionally(e);
}
return future;
}
future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) {
POP_LOGGER.warn("Too much msgs unacked, then stop popping. topic={}, group={}, queueId={}",
topic, requestHeader.getConsumerGroup(), queueId);
try {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
future.complete(restNum);
} catch (ConsumeQueueException e) {
future.completeExceptionally(e);
}
return future;
}
try {
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
true, lockKey, true);
// Current requests would calculate the total number of messages
// waiting to be filtered for new message arrival notifications in
// the long-polling service, need disregarding the backlog in order
// consumption scenario. If rest message num including the blocked
// queue accumulation would lead to frequent unnecessary wake-ups
// of long-polling requests, resulting unnecessary CPU usage.
// When client ack message, long-polling request would be notifications
// by AckMessageProcessor.ackOrderly() and message will not be delayed.
if (isOrder) {
if (brokerController.getConsumerOrderInfoManager().checkBlock(
attemptId, topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
// should not add accumulation(max offset - consumer offset) here
future.complete(restNum);
return future;
}
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(
topic, requestHeader.getConsumerGroup(), queueId);
}
if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
future.complete(restNum);
return future;
}
} catch (Exception e) {
POP_LOGGER.error("Exception in popMsgFromQueue", e);
future.complete(restNum);
return future;
}
AtomicLong atomicRestNum = new AtomicLong(restNum);
AtomicLong atomicOffset = new AtomicLong(offset);
long finalOffset = offset;
return this.brokerController.getMessageStore()
.getMessageAsync(requestHeader.getConsumerGroup(), topic, queueId, offset,
requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter)
.thenCompose(result -> {
if (result == null) {
return CompletableFuture.completedFuture(null);
}
// maybe store offset is not correct.
if (GetMessageStatus.OFFSET_TOO_SMALL.equals(result.getStatus())
|| GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(result.getStatus())
|| GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())) {
// commit offset, because the offset is not correct
// If offset in store is greater than cq offset, it will cause duplicate messages,
// because offset in PopBuffer is not committed.
POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}",
lockKey, atomicOffset.get(), result.getNextBeginOffset());
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
queueId, result.getNextBeginOffset());
atomicOffset.set(result.getNextBeginOffset());
return this.brokerController.getMessageStore().getMessageAsync(requestHeader.getConsumerGroup(), topic, queueId, atomicOffset.get(),
requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
}
return CompletableFuture.completedFuture(result);
}).thenApply(result -> {
if (result == null) {
try {
atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get());
} catch (ConsumeQueueException e) {
POP_LOGGER.error("Failed to get max offset in queue", e);
}
return atomicRestNum.get();
}
if (!result.getMessageMapedList().isEmpty()) {
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), result.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), topic,
result.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
result.getBufferTotalSize());
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
.put(LABEL_IS_RETRY, isRetry)
.build();
BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes);
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes);
if (isOrder) {
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic,
requestHeader.getConsumerGroup(),
queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
orderCountInfo);
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic, queueId, finalOffset);
} else {
if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) {
return atomicRestNum.get() + result.getMessageCount();
}
}
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, queueId, finalOffset);
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, queueId,
result.getMessageQueueOffset());
} else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(result.getStatus())
|| GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
|| GetMessageStatus.MESSAGE_WAS_REMOVING.equals(result.getStatus())
|| GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(result.getStatus()))
&& result.getNextBeginOffset() > -1) {
if (isOrder) {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
queueId, result.getNextBeginOffset());
} else {
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, finalOffset,
requestHeader.getInvisibleTime(), popTime, reviveQid, result.getNextBeginOffset(), brokerController.getBrokerConfig().getBrokerName());
}
}
atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
String brokerName = brokerController.getBrokerConfig().getBrokerName();
for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
// We should not recode buffer when popResponseReturnActualRetryTopic is true or topic is not retry topic
if (brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !isRetry) {
getMessageResult.addMessage(mapedBuffer);
} else {
List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
true, false, true);
mapedBuffer.release();
for (MessageExt messageExt : messageExtList) {
try {
String ckInfo = ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, requestHeader.getInvisibleTime(),
reviveQid, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
// Set retry message topic to origin topic and clear message store size to recode
messageExt.setTopic(requestHeader.getTopic());
messageExt.setStoreSize(0);
byte[] encode = MessageDecoder.encode(messageExt, false);
ByteBuffer buffer = ByteBuffer.wrap(encode);
SelectMappedBufferResult tmpResult =
new SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, null);
getMessageResult.addMessage(tmpResult);
} catch (Exception e) {
POP_LOGGER.error("Exception in recode retry message buffer, topic={}", topic, e);
}
}
}
}
this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
topic,
requestHeader.getConsumerGroup(),
queueId,
result.getMessageCount()
);
return atomicRestNum.get();
}).whenComplete((result, throwable) -> {
if (throwable != null) {
POP_LOGGER.error("Pop message error, {}", lockKey, throwable);
}
queueLockManager.unLock(lockKey);
});
}