in broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java [367:545]
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader,
SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt, long beginTimeMillis,
TopicQueueMappingContext mappingContext, TopicMessageType messageType) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
}
boolean sendOK = false;
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
sendOK = true;
break;
// Failed
case IN_SYNC_REPLICAS_NOT_ENOUGH:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("in-sync replicas not enough");
break;
case CREATE_MAPPED_FILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed, server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(String.format("the message is illegal, maybe msg body or properties length not matched. msg body length limit %dB, msg properties length limit 32KB.",
this.brokerController.getMessageStoreConfig().getMaxMessageSize()));
break;
case WHEEL_TIMER_MSG_ILLEGAL:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(String.format("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time",
this.brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000L));
break;
case WHEEL_TIMER_FLOW_CONTROL:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control",
this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L, this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot(), this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L));
break;
case WHEEL_TIMER_NOT_ENABLE:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("accurate timer message is not enabled, timerWheelEnable is %s",
this.brokerController.getMessageStoreConfig().isTimerWheelEnable()));
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
"service not available now. It may be caused by one of the following reasons: " +
"the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");
break;
case OS_PAGE_CACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_BUSY);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
case LMQ_CONSUME_QUEUE_NUM_EXCEEDED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
String authType = request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE);
String ownerParent = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT);
String ownerSelf = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF);
int commercialSizePerMsg = brokerController.getBrokerConfig().getCommercialSizePerMsg();
if (sendOK) {
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
}
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum());
this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(), queueIdInt,
(int) (this.brokerController.getMessageStore().now() - beginTimeMillis));
if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, msg.getTopic())
.put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
.build();
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
}
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
responseHeader.setTransactionId(MessageClientIDSetter.getUniqID(msg));
attachRecallHandle(request, msg, responseHeader);
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
doResponse(ctx, request, response);
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int msgNum = putMessageResult.getAppendMessageResult().getMsgNum();
int commercialMsgNum = (int) Math.ceil(wroteSize / (double) commercialSizePerMsg);
int incValue = commercialMsgNum * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
sendMessageContext.setSendStat(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendMsgNum(commercialMsgNum);
sendMessageContext.setAccountAuthType(authType);
sendMessageContext.setAccountOwnerParent(ownerParent);
sendMessageContext.setAccountOwnerSelf(ownerSelf);
sendMessageContext.setSendMsgSize(wroteSize);
sendMessageContext.setSendMsgNum(msgNum);
}
return null;
} else {
if (hasSendMessageHook()) {
AppendMessageResult appendMessageResult = putMessageResult.getAppendMessageResult();
// TODO process partial failures of batch message
int wroteSize = request.getBody().length;
int msgNum = Math.max(appendMessageResult != null ? appendMessageResult.getMsgNum() : 1, 1);
int commercialMsgNum = (int) Math.ceil(wroteSize / (double) commercialSizePerMsg);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(commercialMsgNum);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
sendMessageContext.setSendStat(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendMsgNum(commercialMsgNum);
sendMessageContext.setAccountAuthType(authType);
sendMessageContext.setAccountOwnerParent(ownerParent);
sendMessageContext.setAccountOwnerSelf(ownerSelf);
sendMessageContext.setSendMsgSize(wroteSize);
sendMessageContext.setSendMsgNum(msgNum);
}
}
return response;
}