protected RemotingCommand consumerSendMsgBack()

in broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java [92:334]


    protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
        throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
            (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

        // The send back requests sent to SlaveBroker will be forwarded to the master broker beside
        final BrokerController masterBroker = this.brokerController.peekMasterBroker();
        if (null == masterBroker) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("no master available along with " + brokerController.getBrokerConfig().getBrokerIP1());
            return response;
        }

        // The broker that received the request.
        // It may be a master broker or a slave broker
        final BrokerController currentBroker = this.brokerController;

        SubscriptionGroupConfig subscriptionGroupConfig =
            masterBroker.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return response;
        }

        BrokerConfig masterBrokerConfig = masterBroker.getBrokerConfig();
        if (!PermName.isWriteable(masterBrokerConfig.getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the broker[" + masterBrokerConfig.getBrokerIP1() + "] sending message is forbidden");
            return response;
        }

        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = this.random.nextInt(subscriptionGroupConfig.getRetryQueueNums());

        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }

        // Create retry topic to master broker
        TopicConfig topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(
            newTopic,
            subscriptionGroupConfig.getRetryQueueNums(),
            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return response;
        }

        if (!PermName.isWriteable(topicConfig.getPerm())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
            return response;
        }

        // Look message from the origin message store
        MessageExt msgExt = currentBroker.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
        if (null == msgExt) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return response;
        }

        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (null == retryTopic) {
            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
        }
        msgExt.setWaitStoreMsgOK(false);

        int delayLevel = requestHeader.getDelayLevel();

        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            Integer times = requestHeader.getMaxReconsumeTimes();
            if (times != null) {
                maxReconsumeTimes = times;
            }
        }

        boolean isDLQ = false;
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
            || delayLevel < 0) {

            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
                .put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
                .put(LABEL_TOPIC, requestHeader.getOriginTopic())
                .put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), requestHeader.getGroup()))
                .build();
            BrokerMetricsManager.sendToDlqMessages.add(1, attributes);

            isDLQ = true;
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);

            // Create DLQ topic to master broker
            topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE | PermName.PERM_READ, 0);

            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return response;
            }
            msgExt.setDelayTimeLevel(0);
        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }

            msgExt.setDelayTimeLevel(delayLevel);
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

        boolean succeeded = false;

        // Put retry topic to master message store
        PutMessageResult putMessageResult = masterBroker.getMessageStore().putMessage(msgInner);
        if (putMessageResult != null) {
            String commercialOwner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);

            switch (putMessageResult.getPutMessageStatus()) {
                case PUT_OK:
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
                        masterBroker.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                        masterBroker.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
                        masterBroker.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
                        masterBroker.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
                    }
                    masterBroker.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

                    if (isDLQ) {
                        masterBroker.getBrokerStatsManager().incDLQStatValue(
                            BrokerStatsManager.SNDBCK2DLQ_TIMES,
                            commercialOwner,
                            requestHeader.getGroup(),
                            requestHeader.getOriginTopic(),
                            BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ.name(),
                            1);

                        String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                        DLQ_LOG.info("send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, storeTimestamp={}",
                            newTopic,
                            commercialOwner,
                            requestHeader.getOriginTopic(),
                            requestHeader.getGroup(),
                            uniqKey,
                            putMessageResult.getAppendMessageResult().getStoreTimestamp());
                    }

                    response.setCode(ResponseCode.SUCCESS);
                    response.setRemark(null);

                    succeeded = true;
                    break;
                default:
                    break;
            }

            if (!succeeded) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(putMessageResult.getPutMessageStatus().name());
            }
        } else {
            if (isDLQ) {
                String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
                String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                DLQ_LOG.info("failed to send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, result={}",
                    newTopic,
                    owner,
                    requestHeader.getOriginTopic(),
                    requestHeader.getGroup(),
                    uniqKey,
                    "null");
            }

            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("putMessageResult is null");
        }

        if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
            String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
            ConsumeMessageContext context = new ConsumeMessageContext();
            context.setNamespace(namespace);
            context.setTopic(requestHeader.getOriginTopic());
            context.setConsumerGroup(requestHeader.getGroup());
            context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
            context.setCommercialRcvTimes(1);
            context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));

            context.setAccountAuthType(request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE));
            context.setAccountOwnerParent(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT));
            context.setAccountOwnerSelf(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF));
            context.setRcvStat(isDLQ ? BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ : BrokerStatsManager.StatsType.SEND_BACK);
            context.setSuccess(succeeded);
            context.setRcvMsgNum(1);
            //Set msg body size 0 when sent back by consumer.
            context.setRcvMsgSize(0);
            context.setCommercialRcvMsgNum(succeeded ? 1 : 0);

            try {
                this.executeConsumeMessageHookAfter(context);
            } catch (AbortProcessException e) {
                response.setCode(e.getResponseCode());
                response.setRemark(e.getErrorMessage());
            }
        }

        return response;
    }