in twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java [132:195]
public synchronized Iterable<BrokerInfo> getBrokers() {
Preconditions.checkState(isRunning(), "BrokerService is not running.");
if (brokerList != null) {
return brokerList.get();
}
final SettableFuture<?> readerFuture = SettableFuture.create();
final AtomicReference<Iterable<BrokerInfo>> brokers =
new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of());
actOnExists(BROKER_IDS_PATH, new Runnable() {
@Override
public void run() {
// Callback for fetching children list. This callback should be executed in the executorService.
final FutureCallback<NodeChildren> childrenCallback = new FutureCallback<NodeChildren>() {
@Override
public void onSuccess(NodeChildren result) {
try {
// For each children node, get the BrokerInfo from the brokerInfo cache.
brokers.set(
ImmutableList.copyOf(
Iterables.transform(
brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(),
Suppliers.<BrokerInfo>supplierFunction())));
readerFuture.set(null);
for (ListenerExecutor listener : listeners) {
listener.changed(ZKBrokerService.this);
}
} catch (ExecutionException e) {
readerFuture.setException(e.getCause());
}
}
@Override
public void onFailure(Throwable t) {
readerFuture.setException(t);
}
};
// Fetch list of broker ids
Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!isRunning()) {
return;
}
if (event.getType() == Event.EventType.NodeChildrenChanged) {
Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService);
}
}
}), childrenCallback, executorService);
}
}, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
brokerList = createSupplier(brokers);
try {
readerFuture.get();
} catch (Exception e) {
throw Throwables.propagate(e);
}
return brokerList.get();
}