private RemotingCommand processRequest()

in broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java [302:603]


    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend,
        boolean brokerAllowFlowCtrSuspend)
        throws RemotingCommandException {
        final long beginTimeMills = this.brokerController.getMessageStore().now();
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        response.setOpaque(request.getOpaque());

        LOGGER.debug("receive PullMessage request command, {}", request);

        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            responseHeader.setForbiddenType(ForbiddenType.BROKER_FORBIDDEN);
            response.setRemark(String.format("the broker[%s] pulling message is forbidden",
                this.brokerController.getBrokerConfig().getBrokerIP1()));
            return response;
        }

        if (request.getCode() == RequestCode.LITE_PULL_MESSAGE && !this.brokerController.getBrokerConfig().isLitePullMessageEnable()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            responseHeader.setForbiddenType(ForbiddenType.BROKER_FORBIDDEN);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] for lite pull consumer is forbidden");
            return response;
        }

        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
            return response;
        }

        if (!subscriptionGroupConfig.isConsumeEnable()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            responseHeader.setForbiddenType(ForbiddenType.GROUP_FORBIDDEN);
            response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
            return response;
        }

        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (null == topicConfig) {
            LOGGER.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
            return response;
        }

        if (!PermName.isReadable(topicConfig.getPerm())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            responseHeader.setForbiddenType(ForbiddenType.TOPIC_FORBIDDEN);
            response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
            return response;
        }

        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);

        {
            RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }
        }

        if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
            String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
                requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
            LOGGER.warn(errorInfo);
            response.setCode(ResponseCode.INVALID_PARAMETER);
            response.setRemark(errorInfo);
            return response;
        }

        ConsumerManager consumerManager = brokerController.getConsumerManager();
        switch (RequestSource.parseInteger(requestHeader.getRequestSource())) {
            case PROXY_FOR_BROADCAST:
                consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING);
                break;
            case PROXY_FOR_STREAM:
                consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING);
                break;
            default:
                consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING);
                break;
        }

        SubscriptionData subscriptionData = null;
        ConsumerFilterData consumerFilterData = null;
        final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
        if (hasSubscriptionFlag) {
            try {
                subscriptionData = FilterAPI.build(
                    requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
                );
                consumerManager.compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData);

                if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(
                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                        requestHeader.getExpressionType(), requestHeader.getSubVersion()
                    );
                    assert consumerFilterData != null;
                }
            } catch (Exception e) {
                LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
                    requestHeader.getConsumerGroup());
                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                response.setRemark("parse the consumer's subscription failed");
                return response;
            }
        } else {
            ConsumerGroupInfo consumerGroupInfo =
                this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
            if (null == consumerGroupInfo) {
                LOGGER.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
                return response;
            }

            if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
                && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                response.setCode(ResponseCode.NO_PERMISSION);
                responseHeader.setForbiddenType(ForbiddenType.BROADCASTING_DISABLE_FORBIDDEN);
                response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
                return response;
            }

            boolean readForbidden = this.brokerController.getSubscriptionGroupManager().getForbidden(//
                subscriptionGroupConfig.getGroupName(), requestHeader.getTopic(), PermName.INDEX_PERM_READ);
            if (readForbidden) {
                response.setCode(ResponseCode.NO_PERMISSION);
                responseHeader.setForbiddenType(ForbiddenType.SUBSCRIPTION_FORBIDDEN);
                response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] is forbidden for topic[" + requestHeader.getTopic() + "]");
                return response;
            }

            subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
            if (null == subscriptionData) {
                LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
                return response;
            }

            if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                LOGGER.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
                    subscriptionData.getSubString());
                response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
                response.setRemark("the consumer's subscription not latest");
                return response;
            }
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
                    requestHeader.getConsumerGroup());
                if (consumerFilterData == null) {
                    response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
                    response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                    return response;
                }
                if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
                    LOGGER.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
                    response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
                    response.setRemark("the consumer's consumer filter data not latest");
                    return response;
                }
            }
        }

        if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
            && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
            return response;
        }

        MessageFilter messageFilter;
        if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
            messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
                this.brokerController.getConsumerFilterManager());
        } else {
            messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                this.brokerController.getConsumerFilterManager());
        }

        if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
            ConsumerGroupInfo consumerGroupInfo =
                    this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
            if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY == consumerGroupInfo.getConsumeType()) {
                if ((null == consumerGroupInfo || null == consumerGroupInfo.findChannel(channel))
                        && !MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) {
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                    response.setRemark("the consumer's group info not exist, or the pull consumer is rejected by server." + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                    return response;
                }
            }
        }

        final MessageStore messageStore = brokerController.getMessageStore();
        if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
            DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
            boolean cgNeedColdDataFlowCtr = brokerController.getColdDataCgCtrService().isCgNeedColdDataFlowCtr(requestHeader.getConsumerGroup());
            if (cgNeedColdDataFlowCtr) {
                boolean isMsgLogicCold = defaultMessageStore.getCommitLog()
                    .getColdDataCheckService().isMsgInColdArea(requestHeader.getConsumerGroup(),
                        requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset());
                if (isMsgLogicCold) {
                    ConsumeType consumeType = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()).getConsumeType();
                    if (consumeType == ConsumeType.CONSUME_PASSIVELY) {
                        response.setCode(ResponseCode.SYSTEM_BUSY);
                        response.setRemark("This consumer group is reading cold data. It has been flow control");
                        return response;
                    } else if (consumeType == ConsumeType.CONSUME_ACTIVELY) {
                        if (brokerAllowFlowCtrSuspend) {  // second arrived, which will not be held
                            PullRequest pullRequest = new PullRequest(request, channel, 1000,
                                this.brokerController.getMessageStore().now(), requestHeader.getQueueOffset(), subscriptionData, messageFilter);
                            this.brokerController.getColdDataPullRequestHoldService().suspendColdDataReadRequest(pullRequest);
                            return null;
                        }
                        requestHeader.setMaxMsgNums(1);
                    }
                }
            }
        }

        final boolean useResetOffsetFeature = brokerController.getBrokerConfig().isUseServerSideResetOffset();
        String topic = requestHeader.getTopic();
        String group = requestHeader.getConsumerGroup();
        int queueId = requestHeader.getQueueId();
        Long resetOffset = brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic, group, queueId);

        GetMessageResult getMessageResult = null;
        if (useResetOffsetFeature && null != resetOffset) {
            getMessageResult = new GetMessageResult();
            getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
            getMessageResult.setNextBeginOffset(resetOffset);
            getMessageResult.setMinOffset(messageStore.getMinOffsetInQueue(topic, queueId));
            try {
                getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, queueId));
            } catch (ConsumeQueueException e) {
                throw new RemotingCommandException("Failed tp get max offset in queue", e);
            }
            getMessageResult.setSuggestPullingFromSlave(false);
        } else {
            long broadcastInitOffset = queryBroadcastPullInitOffset(topic, group, queueId, requestHeader, channel);
            if (broadcastInitOffset >= 0) {
                getMessageResult = new GetMessageResult();
                getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
                getMessageResult.setNextBeginOffset(broadcastInitOffset);
            } else {
                SubscriptionData finalSubscriptionData = subscriptionData;
                RemotingCommand finalResponse = response;
                messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(),
                        requestHeader.getMaxMsgNums(), messageFilter)
                    .thenApply(result -> {
                        if (null == result) {
                            finalResponse.setCode(ResponseCode.SYSTEM_ERROR);
                            finalResponse.setRemark("store getMessage return null");
                            return finalResponse;
                        }
                        brokerController.getColdDataCgCtrService().coldAcc(requestHeader.getConsumerGroup(), result.getColdDataSum());
                        return pullMessageResultHandler.handle(
                            result,
                            request,
                            requestHeader,
                            channel,
                            finalSubscriptionData,
                            subscriptionGroupConfig,
                            brokerAllowSuspend,
                            messageFilter,
                            finalResponse,
                            mappingContext,
                            beginTimeMills
                        );
                    })
                    .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
            }
        }

        if (getMessageResult != null) {

            return this.pullMessageResultHandler.handle(
                getMessageResult,
                request,
                requestHeader,
                channel,
                subscriptionData,
                subscriptionGroupConfig,
                brokerAllowSuspend,
                messageFilter,
                response,
                mappingContext,
                beginTimeMills
            );
        }
        return null;
    }