private static void watchChanges()

in twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java [312:351]


  private static <T> void watchChanges(final Operation<T> operation, final String path,
                                       final Callback<T> callback, final AtomicBoolean cancelled) {
    Futures.addCallback(operation.exec(path, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (!cancelled.get()) {
          watchChanges(operation, path, callback, cancelled);
        }
      }
    }), new FutureCallback<T>() {
      @Override
      public void onSuccess(T result) {
        if (!cancelled.get()) {
          callback.updated(result);
        }
      }

      @Override
      public void onFailure(Throwable t) {
        if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
          final SettableFuture<String> existCompletion = SettableFuture.create();
          existCompletion.addListener(new Runnable() {
            @Override
            public void run() {
              try {
                if (!cancelled.get()) {
                  watchChanges(operation, existCompletion.get(), callback, cancelled);
                }
              } catch (Exception e) {
                LOG.error("Failed to watch children for path " + path, e);
              }
            }
          }, Threads.SAME_THREAD_EXECUTOR);
          watchExists(operation.getZKClient(), path, existCompletion);
          return;
        }
        LOG.error("Failed to watch data for path " + path + " " + t, t);
      }
    });
  }