func()

in producer/producer.go [314:371]


func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {

	retryTime := 1 + p.options.RetryTimes

	var (
		err error
		mq  *primitive.MessageQueue
	)

	var (
		producerCtx *primitive.ProducerCtx
		ok          bool
	)
	for retryCount := 0; retryCount < retryTime; retryCount++ {
		var lastBrokerName string
		if mq != nil {
			lastBrokerName = mq.BrokerName
		}
		mq = p.selectMessageQueue(msg, lastBrokerName)
		if mq == nil {
			err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
			continue
		}

		if lastBrokerName != "" {
			rlog.Warning("start retrying to send, ", map[string]interface{}{
				"lastBroker": lastBrokerName,
				"newBroker":  mq.BrokerName,
			})
		}

		addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
		if addr == "" {
			return fmt.Errorf("topic=%s route info not found", mq.Topic)
		}

		if p.interceptor != nil {
			producerCtx, ok = primitive.GetProducerCtx(ctx)
			if !ok {
				return fmt.Errorf("ProducerCtx Not Exist")
			}
			producerCtx.BrokerAddr = addr
			producerCtx.MQ = *mq
		}

		res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), p.options.SendMsgTimeout)
		if _err != nil {
			err = _err
			continue
		}

		if needRetryCode(res.Code) && retryCount < retryTime-1 {
			continue
		}
		return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
	}
	return err
}