in broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java [92:334]
protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
// The send back requests sent to SlaveBroker will be forwarded to the master broker beside
final BrokerController masterBroker = this.brokerController.peekMasterBroker();
if (null == masterBroker) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("no master available along with " + brokerController.getBrokerConfig().getBrokerIP1());
return response;
}
// The broker that received the request.
// It may be a master broker or a slave broker
final BrokerController currentBroker = this.brokerController;
SubscriptionGroupConfig subscriptionGroupConfig =
masterBroker.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
BrokerConfig masterBrokerConfig = masterBroker.getBrokerConfig();
if (!PermName.isWriteable(masterBrokerConfig.getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + masterBrokerConfig.getBrokerIP1() + "] sending message is forbidden");
return response;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = this.random.nextInt(subscriptionGroupConfig.getRetryQueueNums());
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// Create retry topic to master broker
TopicConfig topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
// Look message from the origin message store
MessageExt msgExt = currentBroker.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
Integer times = requestHeader.getMaxReconsumeTimes();
if (times != null) {
maxReconsumeTimes = times;
}
}
boolean isDLQ = false;
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
.put(LABEL_TOPIC, requestHeader.getOriginTopic())
.put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), requestHeader.getGroup()))
.build();
BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
isDLQ = true;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
// Create DLQ topic to master broker
topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
msgExt.setDelayTimeLevel(0);
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
boolean succeeded = false;
// Put retry topic to master message store
PutMessageResult putMessageResult = masterBroker.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
String commercialOwner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
masterBroker.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
masterBroker.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
masterBroker.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
masterBroker.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
}
masterBroker.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
if (isDLQ) {
masterBroker.getBrokerStatsManager().incDLQStatValue(
BrokerStatsManager.SNDBCK2DLQ_TIMES,
commercialOwner,
requestHeader.getGroup(),
requestHeader.getOriginTopic(),
BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ.name(),
1);
String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
DLQ_LOG.info("send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, storeTimestamp={}",
newTopic,
commercialOwner,
requestHeader.getOriginTopic(),
requestHeader.getGroup(),
uniqKey,
putMessageResult.getAppendMessageResult().getStoreTimestamp());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
succeeded = true;
break;
default:
break;
}
if (!succeeded) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
}
} else {
if (isDLQ) {
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
DLQ_LOG.info("failed to send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, result={}",
newTopic,
owner,
requestHeader.getOriginTopic(),
requestHeader.getGroup(),
uniqKey,
"null");
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
}
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setTopic(requestHeader.getOriginTopic());
context.setConsumerGroup(requestHeader.getGroup());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
context.setAccountAuthType(request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE));
context.setAccountOwnerParent(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT));
context.setAccountOwnerSelf(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF));
context.setRcvStat(isDLQ ? BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ : BrokerStatsManager.StatsType.SEND_BACK);
context.setSuccess(succeeded);
context.setRcvMsgNum(1);
//Set msg body size 0 when sent back by consumer.
context.setRcvMsgSize(0);
context.setCommercialRcvMsgNum(succeeded ? 1 : 0);
try {
this.executeConsumeMessageHookAfter(context);
} catch (AbortProcessException e) {
response.setCode(e.getResponseCode());
response.setRemark(e.getErrorMessage());
}
}
return response;
}