func()

in pulsar/producer_partition.go [474:559]


func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) {
	var (
		maxRetry int
	)
	if p.options.MaxReconnectToBroker == nil {
		maxRetry = -1
	} else {
		maxRetry = int(*p.options.MaxReconnectToBroker)
	}

	bo := p.backOffPolicyFunc()

	var assignedBrokerURL string
	if connectionClosed != nil && connectionClosed.HasURL() {
		assignedBrokerURL = connectionClosed.assignedBrokerURL
	}

	opFn := func() (struct{}, error) {
		if maxRetry == 0 {
			return struct{}{}, nil
		}

		if p.getProducerState() != producerReady {
			// Producer is already closing
			p.log.Info("producer state not ready, exit reconnect")
			return struct{}{}, nil
		}

		atomic.AddUint64(&p.epoch, 1)
		err := p.grabCnx(assignedBrokerURL)
		if assignedBrokerURL != "" {
			// Only attempt once
			assignedBrokerURL = ""
		}
		if err == nil {
			// Successfully reconnected
			p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
			bo.Reset()
			return struct{}{}, nil
		}
		p.log.WithError(err).Error("Failed to create producer at reconnect")
		errMsg := err.Error()
		if strings.Contains(errMsg, errMsgTopicNotFound) {
			// when topic is deleted, we should give up reconnection.
			p.log.Warn("Topic not found, stop reconnecting, close the producer")
			p.doClose(errors.Join(ErrTopicNotfound, err))
			return struct{}{}, nil
		}

		if strings.Contains(errMsg, errMsgTopicTerminated) {
			p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
			p.doClose(errors.Join(ErrTopicTerminated, err))
			return struct{}{}, nil
		}

		if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
			p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
			p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
			return struct{}{}, nil
		}

		if strings.Contains(errMsg, errMsgProducerFenced) {
			p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
			p.doClose(errors.Join(ErrProducerFenced, err))
			return struct{}{}, nil
		}

		if maxRetry > 0 {
			maxRetry--
		}
		p.metrics.ProducersReconnectFailure.Inc()
		if maxRetry == 0 || bo.IsMaxBackoffReached() {
			p.metrics.ProducersReconnectMaxRetry.Inc()
		}

		return struct{}{}, err
	}
	_, _ = internal.Retry(p.ctx, opFn, func(_ error) time.Duration {
		delayReconnectTime := bo.Next()
		p.log.WithFields(log.Fields{
			"assignedBrokerURL":  assignedBrokerURL,
			"delayReconnectTime": delayReconnectTime,
		}).Info("Reconnecting to broker")
		return delayReconnectTime
	})
}