in client.go [988:1109]
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
pastDeadline := func(backoff time.Duration) bool {
if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
// we are past the deadline
return true
}
return false
}
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
if pastDeadline(backoff) {
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
return err
}
if backoff > 0 {
time.Sleep(backoff)
}
t := atomic.LoadInt64(&client.updateMetadataMs)
if time.Since(time.UnixMilli(t)) < backoff {
return err
}
attemptsRemaining--
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
return client.tryRefreshMetadata(topics, attemptsRemaining, deadline)
}
return err
}
broker := client.LeastLoadedBroker()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
} else {
allowAutoTopicCreation = false
DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}
req := NewMetadataRequest(client.conf.Version, topics)
req.AllowAutoTopicCreation = allowAutoTopicCreation
atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli())
response, err := broker.GetMetadata(req)
var kerror KError
var packetEncodingError PacketEncodingError
if err == nil {
// When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924).
if len(response.Brokers) == 0 {
Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr)
_ = broker.Close()
client.deregisterBroker(broker)
continue
}
allKnownMetaData := len(topics) == 0
// valid response, use it
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
if shouldRetry {
Logger.Println("client/metadata found some partitions to be leaderless")
return retry(err) // note: err can be nil
}
return err
} else if errors.As(err, &packetEncodingError) {
// didn't even send, return the error
return err
} else if errors.As(err, &kerror) {
// if SASL auth error return as this _should_ be a non retryable err for all brokers
if errors.Is(err, ErrSASLAuthenticationFailed) {
Logger.Println("client/metadata failed SASL authentication")
return err
}
if errors.Is(err, ErrTopicAuthorizationFailed) {
Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
return err
}
if errors.Is(err, ErrUnsupportedSASLMechanism) {
Logger.Println("requested SASL mechanism is not supported by the broker")
return err
}
// else remove that broker and try again
Logger.Printf("client/metadata got kafka error from broker %d while fetching metadata: %v\n", broker.ID(), err)
_ = broker.Close()
client.deregisterBroker(broker)
} else {
if errors.Is(err, ErrSASLHandshakeReadEOF) ||
errors.Is(err, ErrSASLHandshakeSendEOF) ||
errors.Is(err, ErrFetchMetadataEOF) ||
errors.Is(err, ErrBadTLSHandshake) {
// These errors are typically unrecoverable, so we return them
// directly here to avoid falling back on the less useful
// "client has run out of brokers" error after retrying.
//
// Beats-specific note: if these errors arise, the connection
// will still be retried, so this will not break things if the
// error is temporary; it will just retry at the user-configured
// rate, and with more informative log messages.
return err
}
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
brokerErrors = append(brokerErrors, err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}
error := Wrap(ErrOutOfBrokers, brokerErrors...)
if broker != nil {
Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(error)
}
Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(error)
}