in pulsar/consumer_partition.go [1852:1920]
func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
if pc.isSeeking.CompareAndSwap(true, false) {
pc.log.Debug("seek operation triggers reconnection, and reset isSeeking")
}
var (
maxRetry int
)
if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}
bo := pc.backoffPolicyFunc()
var assignedBrokerURL string
if connectionClosed != nil && connectionClosed.HasURL() {
assignedBrokerURL = connectionClosed.assignedBrokerURL
}
opFn := func() (struct{}, error) {
if maxRetry == 0 {
return struct{}{}, nil
}
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return struct{}{}, nil
}
err := pc.grabConn(assignedBrokerURL)
if assignedBrokerURL != "" {
// Attempt connecting to the assigned broker just once
assignedBrokerURL = ""
}
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
return struct{}{}, nil
}
if maxRetry > 0 {
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
return struct{}{}, err
}
_, _ = internal.Retry(pc.ctx, opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
return delayReconnectTime
})
}