func()

in pulsar/producer_partition.go [391:441]


func (p *partitionProducer) reconnectToBroker() {
	var maxRetry int
	if p.options.MaxReconnectToBroker == nil {
		maxRetry = -1
	} else {
		maxRetry = int(*p.options.MaxReconnectToBroker)
	}

	for maxRetry != 0 {
		if p.getProducerState() != producerReady {
			// Producer is already closing
			p.log.Info("producer state not ready, exit reconnect")
			return
		}

		var (
			delayReconnectTime time.Duration
			defaultBackoff     = internal.DefaultBackoff{}
		)

		if p.options.BackoffPolicy == nil {
			delayReconnectTime = defaultBackoff.Next()
		} else {
			delayReconnectTime = p.options.BackoffPolicy.Next()
		}
		p.log.Info("Reconnecting to broker in ", delayReconnectTime)
		time.Sleep(delayReconnectTime)
		atomic.AddUint64(&p.epoch, 1)
		err := p.grabCnx()
		if err == nil {
			// Successfully reconnected
			p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
			return
		}
		p.log.WithError(err).Error("Failed to create producer at reconnect")
		errMsg := err.Error()
		if strings.Contains(errMsg, errTopicNotFount) {
			// when topic is deleted, we should give up reconnection.
			p.log.Warn("Topic Not Found.")
			break
		}

		if maxRetry > 0 {
			maxRetry--
		}
		p.metrics.ProducersReconnectFailure.Inc()
		if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
			p.metrics.ProducersReconnectMaxRetry.Inc()
		}
	}
}