in broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java [320:427]
public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime,
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode,
MessageFilter filter) {
PopConsumerContext popConsumerContext =
new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId);
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId);
if (topicConfig == null || !consumerLockService.tryLock(groupId, topicId)) {
return CompletableFuture.completedFuture(popConsumerContext);
}
log.debug("PopConsumerService popAsync, groupId={}, topicId={}, queueId={}, " +
"batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}",
groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter);
String requestKey = groupId + "@" + topicId;
String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topicId, groupId);
String retryTopicV2 = KeyBuilder.buildPopRetryTopicV2(topicId, groupId);
long requestCount = Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent(
requestCountTable, requestKey, k -> new AtomicLong(0L))).getAndIncrement();
boolean preferRetry = requestCount % 5L == 0L;
CompletableFuture<PopConsumerContext> getMessageFuture =
CompletableFuture.completedFuture(popConsumerContext);
try {
if (!fifo && preferRetry) {
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
}
if (brokerConfig.isEnableRetryTopicV2()) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
}
}
if (queueId != -1) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
topicId, queueId, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
} else {
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int current = (int) ((requestCount + i) % topicConfig.getReadQueueNums());
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
topicId, current, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
}
if (!fifo && !preferRetry) {
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
}
if (brokerConfig.isEnableRetryTopicV2()) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
}
}
}
return getMessageFuture.thenCompose(result -> {
if (result.isFound() && !result.isFifo()) {
if (brokerConfig.isEnablePopBufferMerge() &&
popConsumerCache != null && !popConsumerCache.isCacheFull()) {
this.popConsumerCache.writeRecords(result.getPopConsumerRecordList());
} else {
this.popConsumerStore.writeRecords(result.getPopConsumerRecordList());
}
for (int i = 0; i < result.getGetMessageResultList().size(); i++) {
GetMessageResult getMessageResult = result.getGetMessageResultList().get(i);
PopConsumerRecord popConsumerRecord = result.getPopConsumerRecordList().get(i);
// If the buffer belong retries message, the message needs to be re-encoded.
// The buffer should not be re-encoded when popResponseReturnActualRetryTopic
// is true or the current topic is not a retry topic.
boolean recode = brokerConfig.isPopResponseReturnActualRetryTopic();
if (recode && popConsumerRecord.isRetry()) {
result.getGetMessageResultList().set(i, this.recodeRetryMessage(
getMessageResult, popConsumerRecord.getTopicId(),
popConsumerRecord.getQueueId(), result.getPopTime(), invisibleTime));
}
}
}
return CompletableFuture.completedFuture(result);
}).whenComplete((result, throwable) -> {
try {
if (throwable != null) {
log.error("PopConsumerService popAsync get message error",
throwable instanceof CompletionException ? throwable.getCause() : throwable);
}
if (result.getMessageCount() > 0) {
log.debug("PopConsumerService popAsync result, found={}, groupId={}, topicId={}, queueId={}, " +
"batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", result.getMessageCount(),
groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter);
}
} finally {
consumerLockService.unlock(groupId, topicId);
}
});
} catch (Throwable t) {
log.error("PopConsumerService popAsync error", t);
}
return getMessageFuture;
}