func()

in pulsar/consumer_partition.go [1852:1920]


func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
	if pc.isSeeking.CompareAndSwap(true, false) {
		pc.log.Debug("seek operation triggers reconnection, and reset isSeeking")
	}
	var (
		maxRetry int
	)

	if pc.options.maxReconnectToBroker == nil {
		maxRetry = -1
	} else {
		maxRetry = int(*pc.options.maxReconnectToBroker)
	}
	bo := pc.backoffPolicyFunc()

	var assignedBrokerURL string
	if connectionClosed != nil && connectionClosed.HasURL() {
		assignedBrokerURL = connectionClosed.assignedBrokerURL
	}

	opFn := func() (struct{}, error) {
		if maxRetry == 0 {
			return struct{}{}, nil
		}

		if pc.getConsumerState() != consumerReady {
			// Consumer is already closing
			pc.log.Info("consumer state not ready, exit reconnect")
			return struct{}{}, nil
		}

		err := pc.grabConn(assignedBrokerURL)
		if assignedBrokerURL != "" {
			// Attempt connecting to the assigned broker just once
			assignedBrokerURL = ""
		}
		if err == nil {
			// Successfully reconnected
			pc.log.Info("Reconnected consumer to broker")
			bo.Reset()
			return struct{}{}, nil
		}
		pc.log.WithError(err).Error("Failed to create consumer at reconnect")
		errMsg := err.Error()
		if strings.Contains(errMsg, errMsgTopicNotFound) {
			// when topic is deleted, we should give up reconnection.
			pc.log.Warn("Topic Not Found.")
			return struct{}{}, nil
		}

		if maxRetry > 0 {
			maxRetry--
		}
		pc.metrics.ConsumersReconnectFailure.Inc()
		if maxRetry == 0 || bo.IsMaxBackoffReached() {
			pc.metrics.ConsumersReconnectMaxRetry.Inc()
		}

		return struct{}{}, err
	}
	_, _ = internal.Retry(pc.ctx, opFn, func(_ error) time.Duration {
		delayReconnectTime := bo.Next()
		pc.log.WithFields(log.Fields{
			"assignedBrokerURL":  assignedBrokerURL,
			"delayReconnectTime": delayReconnectTime,
		}).Info("Reconnecting to broker")
		return delayReconnectTime
	})
}