in twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java [301:343]
private void actOnExists(final String path, final Runnable action,
final SettableFuture<?> readyFuture, final long retryTime, final TimeUnit retryUnit) {
Futures.addCallback(zkClient.exists(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!isRunning()) {
return;
}
if (event.getType() == Event.EventType.NodeCreated) {
action.run();
}
}
}), new FutureCallback<Stat>() {
@Override
public void onSuccess(Stat result) {
if (result != null) {
action.run();
} else {
// If the node doesn't exists, treat it as ready. When the node becomes available later, data will be
// fetched by the watcher.
readyFuture.set(null);
}
}
@Override
public void onFailure(Throwable t) {
// Retry the operation based on the retry time.
Thread retryThread = new Thread("zk-broker-service-retry") {
@Override
public void run() {
try {
retryUnit.sleep(retryTime);
actOnExists(path, action, readyFuture, retryTime, retryUnit);
} catch (InterruptedException e) {
LOG.warn("ZK retry thread interrupted. Action not retried.");
}
}
};
retryThread.setDaemon(true);
retryThread.start();
}
}, executorService);
}