in broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java [78:223]
public RemotingCommand handle(final GetMessageResult getMessageResult,
final RemotingCommand request,
final PullMessageRequestHeader requestHeader,
final Channel channel,
final SubscriptionData subscriptionData,
final SubscriptionGroupConfig subscriptionGroupConfig,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
RemotingCommand response,
TopicQueueMappingContext mappingContext,
long beginTimeMills) {
PullMessageProcessor processor = brokerController.getPullMessageProcessor();
final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
processor.composeResponseHeader(requestHeader, getMessageResult, topicConfig.getTopicSysFlag(),
subscriptionGroupConfig, response, clientAddress);
try {
processor.executeConsumeMessageHookBefore(request, requestHeader, getMessageResult, brokerAllowSuspend, response.getCode());
} catch (AbortProcessException e) {
response.setCode(e.getResponseCode());
response.setRemark(e.getErrorMessage());
return response;
}
//rewrite the response for the static topic
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
RemotingCommand rewriteResult = processor.rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
if (rewriteResult != null) {
response = rewriteResult;
}
processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
requestHeader.getQueueId(), requestHeader, channel, response, getMessageResult.getNextBeginOffset());
processor.tryCommitOffset(brokerAllowSuspend, requestHeader, getMessageResult.getNextBeginOffset(),
clientAddress);
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());
if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
.build();
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
}
if (!channelIsWritable(channel, requestHeader)) {
getMessageResult.release();
//ignore pull request
return null;
}
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
return response;
} else {
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
RemotingCommand finalResponse = response;
channel.writeAndFlush(fileRegion)
.addListener((ChannelFutureListener) future -> {
getMessageResult.release();
Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode()))
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
.put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
.build();
RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
log.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
});
} catch (Throwable e) {
log.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
}
return null;
}
case ResponseCode.PULL_NOT_FOUND:
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
return null;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
OffsetMovedEvent event = new OffsetMovedEvent();
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
break;
default:
log.warn("[BUG] impossible result code of get message: {}", response.getCode());
assert false;
}
return response;
}