in broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java [302:603]
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend,
boolean brokerAllowFlowCtrSuspend)
throws RemotingCommandException {
final long beginTimeMills = this.brokerController.getMessageStore().now();
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
LOGGER.debug("receive PullMessage request command, {}", request);
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.BROKER_FORBIDDEN);
response.setRemark(String.format("the broker[%s] pulling message is forbidden",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
if (request.getCode() == RequestCode.LITE_PULL_MESSAGE && !this.brokerController.getBrokerConfig().isLitePullMessageEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.BROKER_FORBIDDEN);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] for lite pull consumer is forbidden");
return response;
}
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.GROUP_FORBIDDEN);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOGGER.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.TOPIC_FORBIDDEN);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
{
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
LOGGER.warn(errorInfo);
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(errorInfo);
return response;
}
ConsumerManager consumerManager = brokerController.getConsumerManager();
switch (RequestSource.parseInteger(requestHeader.getRequestSource())) {
case PROXY_FOR_BROADCAST:
consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING);
break;
case PROXY_FOR_STREAM:
consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING);
break;
default:
consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING);
break;
}
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
consumerManager.compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData);
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
}
} catch (Exception e) {
LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOGGER.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.BROADCASTING_DISABLE_FORBIDDEN);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
}
boolean readForbidden = this.brokerController.getSubscriptionGroupManager().getForbidden(//
subscriptionGroupConfig.getGroupName(), requestHeader.getTopic(), PermName.INDEX_PERM_READ);
if (readForbidden) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.SUBSCRIPTION_FORBIDDEN);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] is forbidden for topic[" + requestHeader.getTopic() + "]");
return response;
}
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOGGER.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
LOGGER.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
}
}
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY == consumerGroupInfo.getConsumeType()) {
if ((null == consumerGroupInfo || null == consumerGroupInfo.findChannel(channel))
&& !MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) {
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist, or the pull consumer is rejected by server." + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
}
}
final MessageStore messageStore = brokerController.getMessageStore();
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
boolean cgNeedColdDataFlowCtr = brokerController.getColdDataCgCtrService().isCgNeedColdDataFlowCtr(requestHeader.getConsumerGroup());
if (cgNeedColdDataFlowCtr) {
boolean isMsgLogicCold = defaultMessageStore.getCommitLog()
.getColdDataCheckService().isMsgInColdArea(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset());
if (isMsgLogicCold) {
ConsumeType consumeType = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()).getConsumeType();
if (consumeType == ConsumeType.CONSUME_PASSIVELY) {
response.setCode(ResponseCode.SYSTEM_BUSY);
response.setRemark("This consumer group is reading cold data. It has been flow control");
return response;
} else if (consumeType == ConsumeType.CONSUME_ACTIVELY) {
if (brokerAllowFlowCtrSuspend) { // second arrived, which will not be held
PullRequest pullRequest = new PullRequest(request, channel, 1000,
this.brokerController.getMessageStore().now(), requestHeader.getQueueOffset(), subscriptionData, messageFilter);
this.brokerController.getColdDataPullRequestHoldService().suspendColdDataReadRequest(pullRequest);
return null;
}
requestHeader.setMaxMsgNums(1);
}
}
}
}
final boolean useResetOffsetFeature = brokerController.getBrokerConfig().isUseServerSideResetOffset();
String topic = requestHeader.getTopic();
String group = requestHeader.getConsumerGroup();
int queueId = requestHeader.getQueueId();
Long resetOffset = brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic, group, queueId);
GetMessageResult getMessageResult = null;
if (useResetOffsetFeature && null != resetOffset) {
getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
getMessageResult.setNextBeginOffset(resetOffset);
getMessageResult.setMinOffset(messageStore.getMinOffsetInQueue(topic, queueId));
try {
getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, queueId));
} catch (ConsumeQueueException e) {
throw new RemotingCommandException("Failed tp get max offset in queue", e);
}
getMessageResult.setSuggestPullingFromSlave(false);
} else {
long broadcastInitOffset = queryBroadcastPullInitOffset(topic, group, queueId, requestHeader, channel);
if (broadcastInitOffset >= 0) {
getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
getMessageResult.setNextBeginOffset(broadcastInitOffset);
} else {
SubscriptionData finalSubscriptionData = subscriptionData;
RemotingCommand finalResponse = response;
messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), messageFilter)
.thenApply(result -> {
if (null == result) {
finalResponse.setCode(ResponseCode.SYSTEM_ERROR);
finalResponse.setRemark("store getMessage return null");
return finalResponse;
}
brokerController.getColdDataCgCtrService().coldAcc(requestHeader.getConsumerGroup(), result.getColdDataSum());
return pullMessageResultHandler.handle(
result,
request,
requestHeader,
channel,
finalSubscriptionData,
subscriptionGroupConfig,
brokerAllowSuspend,
messageFilter,
finalResponse,
mappingContext,
beginTimeMills
);
})
.thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
}
}
if (getMessageResult != null) {
return this.pullMessageResultHandler.handle(
getMessageResult,
request,
requestHeader,
channel,
subscriptionData,
subscriptionGroupConfig,
brokerAllowSuspend,
messageFilter,
response,
mappingContext,
beginTimeMills
);
}
return null;
}