func()

in pulsar/consumer_partition.go [1617:1669]


func (pc *partitionConsumer) reconnectToBroker() {
	var maxRetry int

	if pc.options.maxReconnectToBroker == nil {
		maxRetry = -1
	} else {
		maxRetry = int(*pc.options.maxReconnectToBroker)
	}

	var (
		delayReconnectTime time.Duration
		defaultBackoff     = internal.DefaultBackoff{}
	)

	for maxRetry != 0 {
		if pc.getConsumerState() != consumerReady {
			// Consumer is already closing
			pc.log.Info("consumer state not ready, exit reconnect")
			return
		}

		if pc.options.backoffPolicy == nil {
			delayReconnectTime = defaultBackoff.Next()
		} else {
			delayReconnectTime = pc.options.backoffPolicy.Next()
		}

		pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
		time.Sleep(delayReconnectTime)

		err := pc.grabConn()
		if err == nil {
			// Successfully reconnected
			pc.log.Info("Reconnected consumer to broker")
			return
		}
		pc.log.WithError(err).Error("Failed to create consumer at reconnect")
		errMsg := err.Error()
		if strings.Contains(errMsg, errTopicNotFount) {
			// when topic is deleted, we should give up reconnection.
			pc.log.Warn("Topic Not Found.")
			break
		}

		if maxRetry > 0 {
			maxRetry--
		}
		pc.metrics.ConsumersReconnectFailure.Inc()
		if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
			pc.metrics.ConsumersReconnectMaxRetry.Inc()
		}
	}
}