in broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java [220:634]
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final long beginTimeMills = this.brokerController.getMessageStore().now();
// fill bron time to properties if not exist, why we need this?
request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis()));
if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
}
Channel channel = ctx.channel();
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
final PopMessageRequestHeader requestHeader =
request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true);
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
// Pop mode only supports consumption in cluster load balancing mode
brokerController.getConsumerManager().compensateBasicConsumerInfo(
requestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("receive PopMessage request command, {}", request);
}
if (requestHeader.isTimeoutTooMuch()) {
response.setCode(ResponseCode.POLLING_TIMEOUT);
response.setRemark(String.format("the broker[%s] pop message is timeout too much",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pop message is forbidden",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
if (requestHeader.getMaxMsgNums() > 32) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(String.format("the broker[%s] pop message's num is greater than 32",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("the broker[%s] pop message is forbidden because timerWheelEnable is false",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
POP_LOGGER.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(),
RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(),
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] peeking message is forbidden");
return response;
}
if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] " +
"consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(),
channel.remoteAddress());
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(errorInfo);
return response;
}
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s",
requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
SubscriptionData subscriptionData = null;
ExpressionMessageFilter messageFilter = null;
if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) {
try {
// origin topic
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
brokerController.getConsumerManager().compensateSubscribeData(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData);
// retry topic
String retryTopic = KeyBuilder.buildPopRetryTopic(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
SubscriptionData retrySubscriptionData = FilterAPI.build(
retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType());
brokerController.getConsumerManager().compensateSubscribeData(
requestHeader.getConsumerGroup(), retryTopic, retrySubscriptionData);
ConsumerFilterData consumerFilterData = null;
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
requestHeader.getExpType(), System.currentTimeMillis());
if (consumerFilterData == null) {
POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
requestHeader.getExp(), requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
}
messageFilter = new ExpressionMessageFilter(
subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager());
} catch (Exception e) {
POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
try {
// origin topic
subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
brokerController.getConsumerManager().compensateSubscribeData(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData);
// retry topic
String retryTopic = KeyBuilder.buildPopRetryTopic(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, "*", ExpressionType.TAG);
brokerController.getConsumerManager().compensateSubscribeData(
requestHeader.getConsumerGroup(), retryTopic, retrySubscriptionData);
} catch (Exception e) {
POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup());
}
}
GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums());
ExpressionMessageFilter finalMessageFilter = messageFilter;
SubscriptionData finalSubscriptionData = subscriptionData;
if (brokerConfig.isPopConsumerKVServiceEnable()) {
CompletableFuture<PopConsumerContext> popAsyncFuture = brokerController.getPopConsumerService().popAsync(
RemotingHelper.parseChannelRemoteAddr(channel), beginTimeMills, requestHeader.getInvisibleTime(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getMaxMsgNums(), requestHeader.isOrder(),
requestHeader.getAttemptId(), requestHeader.getInitMode(), messageFilter);
popAsyncFuture.thenApply(result -> {
if (result.isFound()) {
response.setCode(ResponseCode.SUCCESS);
getMessageResult.setStatus(GetMessageStatus.FOUND);
// recursive processing
if (result.getRestCount() > 0) {
popLongPollingService.notifyMessageArriving(
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(),
null, 0L, null, null);
}
} else {
POP_LOGGER.debug("Processor not found, polling request, popTime={}, restCount={}",
result.getPopTime(), result.getRestCount());
PollingResult pollingResult = popLongPollingService.polling(
ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter);
if (PollingResult.POLLING_SUC == pollingResult) {
// recursive processing
if (result.getRestCount() > 0) {
popLongPollingService.notifyMessageArriving(
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(),
null, 0L, null, null);
}
return null;
} else if (PollingResult.POLLING_FULL == pollingResult) {
response.setCode(ResponseCode.POLLING_FULL);
} else {
response.setCode(ResponseCode.POLLING_TIMEOUT);
}
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
}
responseHeader.setPopTime(result.getPopTime());
responseHeader.setInvisibleTime(result.getInvisibleTime());
responseHeader.setReviveQid(
requestHeader.isOrder() ? KeyBuilder.POP_ORDER_REVIVE_QUEUE : 0);
responseHeader.setRestNum(result.getRestCount());
responseHeader.setStartOffsetInfo(result.getStartOffsetInfo());
responseHeader.setMsgOffsetInfo(result.getMsgOffsetInfo());
if (requestHeader.isOrder() && !result.getOrderCountInfo().isEmpty()) {
responseHeader.setOrderCountInfo(result.getOrderCountInfo());
}
response.setRemark(getMessageResult.getStatus().name());
if (response.getCode() != ResponseCode.SUCCESS) {
return response;
}
// add message
result.getGetMessageResultList().forEach(temp -> {
for (int i = 0; i < temp.getMessageMapedList().size(); i++) {
getMessageResult.addMessage(temp.getMessageMapedList().get(i));
}
});
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final byte[] r = this.readGetMessageResult(getMessageResult,
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
final GetMessageResult tmpGetMessageResult = getMessageResult;
try {
FileRegion fileRegion = new ManyMessageTransfer(
response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion)
.addListener((ChannelFutureListener) future -> {
tmpGetMessageResult.release();
Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode()))
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(response.getCode()))
.put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
.build();
RemotingMetricsManager.rpcLatency.record(
request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
POP_LOGGER.error("Fail to transfer messages from page cache to {}",
channel.remoteAddress(), future.cause());
}
});
} catch (Throwable e) {
POP_LOGGER.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
}
return null;
}
return response;
}).thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
return null;
}
int randomQ = random.nextInt(100);
int reviveQid;
if (requestHeader.isOrder()) {
reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;
} else {
reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() %
this.brokerController.getBrokerConfig().getReviveQueueNum());
}
StringBuilder startOffsetInfo = new StringBuilder(64);
StringBuilder msgOffsetInfo = new StringBuilder(64);
StringBuilder orderCountInfo = requestHeader.isOrder() ? new StringBuilder(64) : null;
// Due to the design of the fields startOffsetInfo, msgOffsetInfo, and orderCountInfo,
// a single POP request could only invoke the popMsgFromQueue method once
// for either a normal topic or a retry topic's queue. Retry topics v1 and v2 are
// considered the same type because they share the same retry flag in previous fields.
// Therefore, needRetryV1 is designed as a subset of needRetry, and within a single request,
// only one type of retry topic is able to call popMsgFromQueue.
boolean needRetry = randomQ < brokerConfig.getPopFromRetryProbability();
boolean needRetryV1 = false;
if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
needRetryV1 = randomQ % 2 == 0;
}
long popTime = System.currentTimeMillis();
CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L);
if (needRetry && !requestHeader.isOrder()) {
if (needRetryV1) {
String retryTopic = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel,
popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
} else {
String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel,
popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
}
}
if (requestHeader.getQueueId() < 0) {
// read all queue
getMessageFuture = popMsgFromTopic(topicConfig, false, getMessageResult, requestHeader, reviveQid, channel,
popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
} else {
int queueId = requestHeader.getQueueId();
getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false,
getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, orderCountInfo));
}
// if not full , fetch retry again
if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
if (needRetryV1) {
String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
getMessageFuture = popMsgFromTopic(retryTopicV1, true, getMessageResult, requestHeader, reviveQid, channel,
popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
} else {
String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel,
popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
}
}
final RemotingCommand finalResponse = response;
getMessageFuture.thenApply(restNum -> {
try {
if (request.getCallbackList() != null) {
request.getCallbackList().forEach(CommandCallback::accept);
request.getCallbackList().clear();
}
} catch (Throwable t) {
POP_LOGGER.error("PopProcessor execute callback error", t);
}
if (!getMessageResult.getMessageBufferList().isEmpty()) {
finalResponse.setCode(ResponseCode.SUCCESS);
getMessageResult.setStatus(GetMessageStatus.FOUND);
if (restNum > 0) {
// all queue pop can not notify specified queue pop, and vice versa
popLongPollingService.notifyMessageArriving(
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(),
null, 0L, null, null);
}
} else {
PollingResult pollingResult = popLongPollingService.polling(
ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter);
if (PollingResult.POLLING_SUC == pollingResult) {
if (restNum > 0) {
popLongPollingService.notifyMessageArriving(
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(),
null, 0L, null, null);
}
return null;
} else if (PollingResult.POLLING_FULL == pollingResult) {
finalResponse.setCode(ResponseCode.POLLING_FULL);
} else {
finalResponse.setCode(ResponseCode.POLLING_TIMEOUT);
}
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
}
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
responseHeader.setPopTime(popTime);
responseHeader.setReviveQid(reviveQid);
responseHeader.setRestNum(restNum);
responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
if (requestHeader.isOrder() && orderCountInfo != null) {
responseHeader.setOrderCountInfo(orderCountInfo.toString());
}
finalResponse.setRemark(getMessageResult.getStatus().name());
switch (finalResponse.getCode()) {
case ResponseCode.SUCCESS:
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
finalResponse.setBody(r);
} else {
final GetMessageResult tmpGetMessageResult = getMessageResult;
try {
FileRegion fileRegion =
new ManyMessageTransfer(finalResponse.encodeHeader(getMessageResult.getBufferTotalSize()),
getMessageResult);
channel.writeAndFlush(fileRegion)
.addListener((ChannelFutureListener) future -> {
tmpGetMessageResult.release();
Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode()))
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
.put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
.build();
RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
POP_LOGGER.error("Fail to transfer messages from page cache to {}",
channel.remoteAddress(), future.cause());
}
});
} catch (Throwable e) {
POP_LOGGER.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
}
return null;
}
break;
default:
return finalResponse;
}
return finalResponse;
}).thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
return null;
}