func()

in client.go [988:1109]


func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
	pastDeadline := func(backoff time.Duration) bool {
		if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
			// we are past the deadline
			return true
		}
		return false
	}
	retry := func(err error) error {
		if attemptsRemaining > 0 {
			backoff := client.computeBackoff(attemptsRemaining)
			if pastDeadline(backoff) {
				Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
				return err
			}
			if backoff > 0 {
				time.Sleep(backoff)
			}

			t := atomic.LoadInt64(&client.updateMetadataMs)
			if time.Since(time.UnixMilli(t)) < backoff {
				return err
			}
			attemptsRemaining--
			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)

			return client.tryRefreshMetadata(topics, attemptsRemaining, deadline)
		}
		return err
	}

	broker := client.LeastLoadedBroker()
	brokerErrors := make([]error, 0)
	for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
		allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
		if len(topics) > 0 {
			DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
		} else {
			allowAutoTopicCreation = false
			DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
		}

		req := NewMetadataRequest(client.conf.Version, topics)
		req.AllowAutoTopicCreation = allowAutoTopicCreation
		atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli())

		response, err := broker.GetMetadata(req)
		var kerror KError
		var packetEncodingError PacketEncodingError
		if err == nil {
			// When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924).
			if len(response.Brokers) == 0 {
				Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr)
				_ = broker.Close()
				client.deregisterBroker(broker)
				continue
			}
			allKnownMetaData := len(topics) == 0
			// valid response, use it
			shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
			if shouldRetry {
				Logger.Println("client/metadata found some partitions to be leaderless")
				return retry(err) // note: err can be nil
			}
			return err
		} else if errors.As(err, &packetEncodingError) {
			// didn't even send, return the error
			return err
		} else if errors.As(err, &kerror) {
			// if SASL auth error return as this _should_ be a non retryable err for all brokers
			if errors.Is(err, ErrSASLAuthenticationFailed) {
				Logger.Println("client/metadata failed SASL authentication")
				return err
			}

			if errors.Is(err, ErrTopicAuthorizationFailed) {
				Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
				return err
			}
			if errors.Is(err, ErrUnsupportedSASLMechanism) {
				Logger.Println("requested SASL mechanism is not supported by the broker")
				return err
			}

			// else remove that broker and try again
			Logger.Printf("client/metadata got kafka error from broker %d while fetching metadata: %v\n", broker.ID(), err)
			_ = broker.Close()
			client.deregisterBroker(broker)
		} else {
			if errors.Is(err, ErrSASLHandshakeReadEOF) ||
				errors.Is(err, ErrSASLHandshakeSendEOF) ||
				errors.Is(err, ErrFetchMetadataEOF) ||
				errors.Is(err, ErrBadTLSHandshake) {
				// These errors are typically unrecoverable, so we return them
				// directly here to avoid falling back on the less useful
				// "client has run out of brokers" error after retrying.
				//
				// Beats-specific note: if these errors arise, the connection
				// will still be retried, so this will not break things if the
				// error is temporary; it will just retry at the user-configured
				// rate, and with more informative log messages.
				return err
			}

			// some other error, remove that broker and try again
			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
			brokerErrors = append(brokerErrors, err)
			_ = broker.Close()
			client.deregisterBroker(broker)
		}
	}

	error := Wrap(ErrOutOfBrokers, brokerErrors...)
	if broker != nil {
		Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
		return retry(error)
	}

	Logger.Println("client/metadata no available broker to send metadata request to")
	client.resurrectDeadBrokers()
	return retry(error)
}