func()

in client.go [1213:1289]


func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) {
	retry := func(err error) (*FindCoordinatorResponse, error) {
		if attemptsRemaining > 0 {
			backoff := client.computeBackoff(attemptsRemaining)
			attemptsRemaining--
			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
			time.Sleep(backoff)
			return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining)
		}
		return nil, err
	}

	brokerErrors := make([]error, 0)
	for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
		DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())

		request := new(FindCoordinatorRequest)
		request.CoordinatorKey = coordinatorKey
		request.CoordinatorType = coordinatorType

		// Version 1 adds KeyType.
		if client.conf.Version.IsAtLeast(V0_11_0_0) {
			request.Version = 1
		}
		// Version 2 is the same as version 1.
		if client.conf.Version.IsAtLeast(V2_0_0_0) {
			request.Version = 2
		}

		response, err := broker.FindCoordinator(request)
		if err != nil {
			Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)

			var packetEncodingError PacketEncodingError
			if errors.As(err, &packetEncodingError) {
				return nil, err
			} else {
				_ = broker.Close()
				brokerErrors = append(brokerErrors, err)
				client.deregisterBroker(broker)
				continue
			}
		}

		if errors.Is(response.Err, ErrNoError) {
			DebugLogger.Printf("client/coordinator coordinator for %s is #%d (%s)\n", coordinatorKey, response.Coordinator.ID(), response.Coordinator.Addr())
			return response, nil
		} else if errors.Is(response.Err, ErrConsumerCoordinatorNotAvailable) {
			Logger.Printf("client/coordinator coordinator for %s is not available\n", coordinatorKey)

			// This is very ugly, but this scenario will only happen once per cluster.
			// The __consumer_offsets topic only has to be created one time.
			// The number of partitions not configurable, but partition 0 should always exist.
			if _, err := client.Leader("__consumer_offsets", 0); err != nil {
				Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
				time.Sleep(2 * time.Second)
			}
			if coordinatorType == CoordinatorTransaction {
				if _, err := client.Leader("__transaction_state", 0); err != nil {
					Logger.Printf("client/coordinator the __transaction_state topic is not initialized completely yet. Waiting 2 seconds...\n")
					time.Sleep(2 * time.Second)
				}
			}

			return retry(ErrConsumerCoordinatorNotAvailable)
		} else if errors.Is(response.Err, ErrGroupAuthorizationFailed) {
			Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", coordinatorKey)
			return retry(ErrGroupAuthorizationFailed)
		} else {
			return nil, response.Err
		}
	}

	Logger.Println("client/coordinator no available broker to send consumer metadata request to")
	client.resurrectDeadBrokers()
	return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}