func()

in consumer/consumer.go [1057:1116]


func (dc *defaultConsumer) sendMessageBackAsNormal(msg *primitive.MessageExt, maxReconsumeTimes int32) bool {
	retryTopic := internal.GetRetryTopic(dc.consumerGroup)
	normalMsg := &primitive.Message{
		Topic: retryTopic,
		Body:  msg.Body,
		Flag:  msg.Flag,
	}
	normalMsg.WithProperties(msg.GetProperties())
	originMsgId := msg.GetProperty(primitive.PropertyOriginMessageId)
	if len(originMsgId) == 0 {
		originMsgId = msg.MsgId
	}
	normalMsg.WithProperty(primitive.PropertyOriginMessageId, originMsgId)
	normalMsg.WithProperty(primitive.PropertyRetryTopic, msg.Topic)
	normalMsg.RemoveProperty(primitive.PropertyTransactionPrepared)
	normalMsg.WithDelayTimeLevel(int(3 + msg.ReconsumeTimes))

	mq, err := dc.findPublishMessageQueue(retryTopic)
	if err != nil {
		rlog.Warning("sendMessageBackAsNormal find publish message queue error", map[string]interface{}{
			rlog.LogKeyTopic:         retryTopic,
			rlog.LogKeyMessageId:     msg.MsgId,
			rlog.LogKeyUnderlayError: err.Error(),
		})
		return false
	}

	brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
	if len(brokerAddr) == 0 {
		rlog.Warning("sendMessageBackAsNormal cannot find broker address", map[string]interface{}{
			rlog.LogKeyMessageId:     msg.MsgId,
			rlog.LogKeyBroker:        mq.BrokerName,
			rlog.LogKeyUnderlayError: err.Error(),
		})
		return false
	}

	request := buildSendToRetryRequest(mq, normalMsg, msg.ReconsumeTimes+1, maxReconsumeTimes)
	resp, err := dc.client.InvokeSync(context.Background(), brokerAddr, request, _SendMessageBackAsNormalTimeout)
	if err != nil {
		rlog.Warning("sendMessageBackAsNormal failed to invoke", map[string]interface{}{
			rlog.LogKeyTopic:         retryTopic,
			rlog.LogKeyMessageId:     msg.MsgId,
			rlog.LogKeyBroker:        brokerAddr,
			rlog.LogKeyUnderlayError: err.Error(),
		})
		return false
	}
	if resp.Code != internal.ResSuccess {
		rlog.Warning("sendMessageBackAsNormal failed to send", map[string]interface{}{
			rlog.LogKeyTopic:         retryTopic,
			rlog.LogKeyMessageId:     msg.MsgId,
			rlog.LogKeyBroker:        brokerAddr,
			rlog.LogKeyUnderlayError: fmt.Errorf("CODE: %d, DESC: %s", resp.Code, resp.Remark),
		})
		return false
	}

	return true
}