public synchronized Iterable getBrokers()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java [132:195]


  public synchronized Iterable<BrokerInfo> getBrokers() {
    Preconditions.checkState(isRunning(), "BrokerService is not running.");

    if (brokerList != null) {
      return brokerList.get();
    }

    final SettableFuture<?> readerFuture = SettableFuture.create();
    final AtomicReference<Iterable<BrokerInfo>> brokers =
      new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of());

    actOnExists(BROKER_IDS_PATH, new Runnable() {
      @Override
      public void run() {
        // Callback for fetching children list. This callback should be executed in the executorService.
        final FutureCallback<NodeChildren> childrenCallback = new FutureCallback<NodeChildren>() {
          @Override
          public void onSuccess(NodeChildren result) {
            try {
              // For each children node, get the BrokerInfo from the brokerInfo cache.
              brokers.set(
                ImmutableList.copyOf(
                  Iterables.transform(
                    brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(),
                    Suppliers.<BrokerInfo>supplierFunction())));
              readerFuture.set(null);

              for (ListenerExecutor listener : listeners) {
                listener.changed(ZKBrokerService.this);
              }
            } catch (ExecutionException e) {
              readerFuture.setException(e.getCause());
            }
          }

          @Override
          public void onFailure(Throwable t) {
            readerFuture.setException(t);
          }
        };

        // Fetch list of broker ids
        Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, new Watcher() {
          @Override
          public void process(WatchedEvent event) {
            if (!isRunning()) {
              return;
            }
            if (event.getType() == Event.EventType.NodeChildrenChanged) {
              Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService);
            }
          }
        }), childrenCallback, executorService);
      }
    }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);

    brokerList = createSupplier(brokers);
    try {
      readerFuture.get();
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
    return brokerList.get();
  }