in broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java [83:229]
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
final long beginTimeMills = this.brokerController.getMessageStore().now();
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
final PeekMessageRequestHeader requestHeader =
(PeekMessageRequestHeader) request.decodeCommandCustomHeader(PeekMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] peeking message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] peeking message is forbidden");
return response;
}
if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
LOG.warn(errorInfo);
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(errorInfo);
return response;
}
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}
int randomQ = random.nextInt(100);
int reviveQid = randomQ % this.brokerController.getBrokerConfig().getReviveQueueNum();
GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums());
boolean needRetry = randomQ % 5 == 0;
long popTime = System.currentTimeMillis();
long restNum = 0;
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
if (needRetry) {
TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager()
.selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()));
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
restNum = peekMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime);
}
}
}
if (requestHeader.getQueueId() < 0) {
// read all queue
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
restNum = peekMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime);
}
} else {
int queueId = requestHeader.getQueueId();
restNum = peekMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime);
}
// if not full , fetch retry again
if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums()) {
TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager()
.selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()));
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
restNum = peekMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime);
}
}
}
if (!getMessageResult.getMessageBufferList().isEmpty()) {
response.setCode(ResponseCode.SUCCESS);
getMessageResult.setStatus(GetMessageStatus.FOUND);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
}
responseHeader.setRestNum(restNum);
response.setRemark(getMessageResult.getStatus().name());
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
final GetMessageResult tmpGetMessageResult = getMessageResult;
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
RemotingCommand finalResponse = response;
channel.writeAndFlush(fileRegion)
.addListener((ChannelFutureListener) future -> {
tmpGetMessageResult.release();
Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode()))
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
.put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
.build();
RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
});
} catch (Throwable e) {
LOG.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
}
response = null;
}
break;
default:
assert false;
}
return response;
}