in pulsar/producer_partition.go [391:441]
func (p *partitionProducer) reconnectToBroker() {
var maxRetry int
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}
for maxRetry != 0 {
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return
}
var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
if p.options.BackoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = p.options.BackoffPolicy.Next()
}
p.log.Info("Reconnecting to broker in ", delayReconnectTime)
time.Sleep(delayReconnectTime)
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx()
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
return
}
p.log.WithError(err).Error("Failed to create producer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic Not Found.")
break
}
if maxRetry > 0 {
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
}