in pulsar/consumer_partition.go [1617:1669]
func (pc *partitionConsumer) reconnectToBroker() {
var maxRetry int
if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}
var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
}
if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
}
pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
time.Sleep(delayReconnectTime)
err := pc.grabConn()
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
return
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
break
}
if maxRetry > 0 {
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
}