in pulsar/producer_partition.go [474:559]
func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) {
var (
maxRetry int
)
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}
bo := p.backOffPolicyFunc()
var assignedBrokerURL string
if connectionClosed != nil && connectionClosed.HasURL() {
assignedBrokerURL = connectionClosed.assignedBrokerURL
}
opFn := func() (struct{}, error) {
if maxRetry == 0 {
return struct{}{}, nil
}
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return struct{}{}, nil
}
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx(assignedBrokerURL)
if assignedBrokerURL != "" {
// Only attempt once
assignedBrokerURL = ""
}
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
bo.Reset()
return struct{}{}, nil
}
p.log.WithError(err).Error("Failed to create producer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(errors.Join(ErrTopicNotfound, err))
return struct{}{}, nil
}
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(errors.Join(ErrTopicTerminated, err))
return struct{}{}, nil
}
if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
return struct{}{}, nil
}
if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(errors.Join(ErrProducerFenced, err))
return struct{}{}, nil
}
if maxRetry > 0 {
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
if maxRetry == 0 || bo.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
return struct{}{}, err
}
_, _ = internal.Retry(p.ctx, opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
p.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
return delayReconnectTime
})
}