in producer/producer.go [314:371]
func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {
retryTime := 1 + p.options.RetryTimes
var (
err error
mq *primitive.MessageQueue
)
var (
producerCtx *primitive.ProducerCtx
ok bool
)
for retryCount := 0; retryCount < retryTime; retryCount++ {
var lastBrokerName string
if mq != nil {
lastBrokerName = mq.BrokerName
}
mq = p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
continue
}
if lastBrokerName != "" {
rlog.Warning("start retrying to send, ", map[string]interface{}{
"lastBroker": lastBrokerName,
"newBroker": mq.BrokerName,
})
}
addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
if p.interceptor != nil {
producerCtx, ok = primitive.GetProducerCtx(ctx)
if !ok {
return fmt.Errorf("ProducerCtx Not Exist")
}
producerCtx.BrokerAddr = addr
producerCtx.MQ = *mq
}
res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), p.options.SendMsgTimeout)
if _err != nil {
err = _err
continue
}
if needRetryCode(res.Code) && retryCount < retryTime-1 {
continue
}
return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
}
return err
}