private RemotingCommand handlePutMessageResult()

in broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java [367:545]


    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
        RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader,
        SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt, long beginTimeMillis,
        TopicQueueMappingContext mappingContext, TopicMessageType messageType) {
        if (putMessageResult == null) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("store putMessage return null");
            return response;
        }
        boolean sendOK = false;

        switch (putMessageResult.getPutMessageStatus()) {
            // Success
            case PUT_OK:
                sendOK = true;
                response.setCode(ResponseCode.SUCCESS);
                break;
            case FLUSH_DISK_TIMEOUT:
                response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
                sendOK = true;
                break;
            case FLUSH_SLAVE_TIMEOUT:
                response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
                sendOK = true;
                break;
            case SLAVE_NOT_AVAILABLE:
                response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
                sendOK = true;
                break;

            // Failed
            case IN_SYNC_REPLICAS_NOT_ENOUGH:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("in-sync replicas not enough");
                break;
            case CREATE_MAPPED_FILE_FAILED:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("create mapped file failed, server is busy or broken.");
                break;
            case MESSAGE_ILLEGAL:
            case PROPERTIES_SIZE_EXCEEDED:
                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                response.setRemark(String.format("the message is illegal, maybe msg body or properties length not matched. msg body length limit %dB, msg properties length limit 32KB.",
                    this.brokerController.getMessageStoreConfig().getMaxMessageSize()));
                break;
            case WHEEL_TIMER_MSG_ILLEGAL:
                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                response.setRemark(String.format("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time",
                    this.brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000L));
                break;
            case WHEEL_TIMER_FLOW_CONTROL:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control",
                     this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L, this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot(), this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L));
                break;
            case WHEEL_TIMER_NOT_ENABLE:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("accurate timer message is not enabled, timerWheelEnable is %s",
                     this.brokerController.getMessageStoreConfig().isTimerWheelEnable()));
                break;
            case SERVICE_NOT_AVAILABLE:
                response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
                response.setRemark(
                    "service not available now. It may be caused by one of the following reasons: " +
                        "the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");
                break;
            case OS_PAGE_CACHE_BUSY:
                response.setCode(ResponseCode.SYSTEM_BUSY);
                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;
            case LMQ_CONSUME_QUEUE_NUM_EXCEEDED:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");
                break;
            case UNKNOWN_ERROR:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UNKNOWN_ERROR");
                break;
            default:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UNKNOWN_ERROR DEFAULT");
                break;
        }

        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
        String authType = request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE);
        String ownerParent = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT);
        String ownerSelf = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF);
        int commercialSizePerMsg = brokerController.getBrokerConfig().getCommercialSizePerMsg();
        if (sendOK) {

            if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
                this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
                this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
            }

            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                putMessageResult.getAppendMessageResult().getWroteBytes());
            this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum());
            this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(), queueIdInt,
                (int) (this.brokerController.getMessageStore().now() - beginTimeMillis));

            if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
                Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
                    .put(LABEL_TOPIC, msg.getTopic())
                    .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
                    .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
                    .build();
                BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
                BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
                BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
            }

            response.setRemark(null);

            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            responseHeader.setQueueId(queueIdInt);
            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
            responseHeader.setTransactionId(MessageClientIDSetter.getUniqID(msg));
            attachRecallHandle(request, msg, responseHeader);

            RemotingCommand rewriteResult = rewriteResponseForStaticTopic(responseHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }

            doResponse(ctx, request, response);

            if (hasSendMessageHook()) {
                sendMessageContext.setMsgId(responseHeader.getMsgId());
                sendMessageContext.setQueueId(responseHeader.getQueueId());
                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());

                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
                int msgNum = putMessageResult.getAppendMessageResult().getMsgNum();
                int commercialMsgNum = (int) Math.ceil(wroteSize / (double) commercialSizePerMsg);
                int incValue = commercialMsgNum * commercialBaseCount;

                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                sendMessageContext.setCommercialSendTimes(incValue);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);

                sendMessageContext.setSendStat(BrokerStatsManager.StatsType.SEND_SUCCESS);
                sendMessageContext.setCommercialSendMsgNum(commercialMsgNum);
                sendMessageContext.setAccountAuthType(authType);
                sendMessageContext.setAccountOwnerParent(ownerParent);
                sendMessageContext.setAccountOwnerSelf(ownerSelf);
                sendMessageContext.setSendMsgSize(wroteSize);
                sendMessageContext.setSendMsgNum(msgNum);
            }
            return null;
        } else {
            if (hasSendMessageHook()) {
                AppendMessageResult appendMessageResult = putMessageResult.getAppendMessageResult();

                // TODO process partial failures of batch message
                int wroteSize = request.getBody().length;
                int msgNum = Math.max(appendMessageResult != null ? appendMessageResult.getMsgNum() : 1, 1);
                int commercialMsgNum = (int) Math.ceil(wroteSize / (double) commercialSizePerMsg);

                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                sendMessageContext.setCommercialSendTimes(commercialMsgNum);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);

                sendMessageContext.setSendStat(BrokerStatsManager.StatsType.SEND_FAILURE);
                sendMessageContext.setCommercialSendMsgNum(commercialMsgNum);
                sendMessageContext.setAccountAuthType(authType);
                sendMessageContext.setAccountOwnerParent(ownerParent);
                sendMessageContext.setAccountOwnerSelf(ownerSelf);
                sendMessageContext.setSendMsgSize(wroteSize);
                sendMessageContext.setSendMsgNum(msgNum);
            }
        }
        return response;
    }