in consumer/consumer.go [1057:1116]
func (dc *defaultConsumer) sendMessageBackAsNormal(msg *primitive.MessageExt, maxReconsumeTimes int32) bool {
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
normalMsg := &primitive.Message{
Topic: retryTopic,
Body: msg.Body,
Flag: msg.Flag,
}
normalMsg.WithProperties(msg.GetProperties())
originMsgId := msg.GetProperty(primitive.PropertyOriginMessageId)
if len(originMsgId) == 0 {
originMsgId = msg.MsgId
}
normalMsg.WithProperty(primitive.PropertyOriginMessageId, originMsgId)
normalMsg.WithProperty(primitive.PropertyRetryTopic, msg.Topic)
normalMsg.RemoveProperty(primitive.PropertyTransactionPrepared)
normalMsg.WithDelayTimeLevel(int(3 + msg.ReconsumeTimes))
mq, err := dc.findPublishMessageQueue(retryTopic)
if err != nil {
rlog.Warning("sendMessageBackAsNormal find publish message queue error", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}
brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if len(brokerAddr) == 0 {
rlog.Warning("sendMessageBackAsNormal cannot find broker address", map[string]interface{}{
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: mq.BrokerName,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}
request := buildSendToRetryRequest(mq, normalMsg, msg.ReconsumeTimes+1, maxReconsumeTimes)
resp, err := dc.client.InvokeSync(context.Background(), brokerAddr, request, _SendMessageBackAsNormalTimeout)
if err != nil {
rlog.Warning("sendMessageBackAsNormal failed to invoke", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}
if resp.Code != internal.ResSuccess {
rlog.Warning("sendMessageBackAsNormal failed to send", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: fmt.Errorf("CODE: %d, DESC: %s", resp.Code, resp.Remark),
})
return false
}
return true
}