in twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java [223:279]
public Supplier<T> load(final K key) throws Exception {
// A future to tell if the result is ready, even it is failure.
final SettableFuture<T> readyFuture = SettableFuture.create();
final AtomicReference<T> resultValue = new AtomicReference<T>();
// Fetch for node data when it exists.
final String path = key.getPath();
actOnExists(path, new Runnable() {
@Override
public void run() {
// Callback for getData call
final FutureCallback<NodeData> dataCallback = new FutureCallback<NodeData>() {
@Override
public void onSuccess(NodeData result) {
// Update with latest data
T value = decodeNodeData(result, resultType);
resultValue.set(value);
readyFuture.set(value);
}
@Override
public void onFailure(Throwable t) {
LOG.error("Failed to fetch node data on {}", path, t);
if (t instanceof KeeperException.NoNodeException) {
resultValue.set(null);
readyFuture.set(null);
return;
}
// On error, simply invalidate the key so that it'll be fetched next time.
invalidater.invalidate(key);
readyFuture.setException(t);
}
};
// Fetch node data
Futures.addCallback(zkClient.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!isRunning()) {
return;
}
if (event.getType() == Event.EventType.NodeDataChanged) {
// If node data changed, fetch it again.
Futures.addCallback(zkClient.getData(path, this), dataCallback, executorService);
} else if (event.getType() == Event.EventType.NodeDeleted) {
// If node removed, invalidate the cached value.
brokerInfos.invalidate(key);
}
}
}), dataCallback, executorService);
}
}, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
readyFuture.get();
return createSupplier(resultValue);
}