in twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java [312:351]
private static <T> void watchChanges(final Operation<T> operation, final String path,
final Callback<T> callback, final AtomicBoolean cancelled) {
Futures.addCallback(operation.exec(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!cancelled.get()) {
watchChanges(operation, path, callback, cancelled);
}
}
}), new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
if (!cancelled.get()) {
callback.updated(result);
}
}
@Override
public void onFailure(Throwable t) {
if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
final SettableFuture<String> existCompletion = SettableFuture.create();
existCompletion.addListener(new Runnable() {
@Override
public void run() {
try {
if (!cancelled.get()) {
watchChanges(operation, existCompletion.get(), callback, cancelled);
}
} catch (Exception e) {
LOG.error("Failed to watch children for path " + path, e);
}
}
}, Threads.SAME_THREAD_EXECUTOR);
watchExists(operation.getZKClient(), path, existCompletion);
return;
}
LOG.error("Failed to watch data for path " + path + " " + t, t);
}
});
}