in client.go [1213:1289]
func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
attemptsRemaining--
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
time.Sleep(backoff)
return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining)
}
return nil, err
}
brokerErrors := make([]error, 0)
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())
request := new(FindCoordinatorRequest)
request.CoordinatorKey = coordinatorKey
request.CoordinatorType = coordinatorType
// Version 1 adds KeyType.
if client.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}
// Version 2 is the same as version 1.
if client.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 2
}
response, err := broker.FindCoordinator(request)
if err != nil {
Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
var packetEncodingError PacketEncodingError
if errors.As(err, &packetEncodingError) {
return nil, err
} else {
_ = broker.Close()
brokerErrors = append(brokerErrors, err)
client.deregisterBroker(broker)
continue
}
}
if errors.Is(response.Err, ErrNoError) {
DebugLogger.Printf("client/coordinator coordinator for %s is #%d (%s)\n", coordinatorKey, response.Coordinator.ID(), response.Coordinator.Addr())
return response, nil
} else if errors.Is(response.Err, ErrConsumerCoordinatorNotAvailable) {
Logger.Printf("client/coordinator coordinator for %s is not available\n", coordinatorKey)
// This is very ugly, but this scenario will only happen once per cluster.
// The __consumer_offsets topic only has to be created one time.
// The number of partitions not configurable, but partition 0 should always exist.
if _, err := client.Leader("__consumer_offsets", 0); err != nil {
Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
time.Sleep(2 * time.Second)
}
if coordinatorType == CoordinatorTransaction {
if _, err := client.Leader("__transaction_state", 0); err != nil {
Logger.Printf("client/coordinator the __transaction_state topic is not initialized completely yet. Waiting 2 seconds...\n")
time.Sleep(2 * time.Second)
}
}
return retry(ErrConsumerCoordinatorNotAvailable)
} else if errors.Is(response.Err, ErrGroupAuthorizationFailed) {
Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", coordinatorKey)
return retry(ErrGroupAuthorizationFailed)
} else {
return nil, response.Err
}
}
Logger.Println("client/coordinator no available broker to send consumer metadata request to")
client.resurrectDeadBrokers()
return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}