private void actOnExists()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java [301:343]


  private void actOnExists(final String path, final Runnable action,
                           final SettableFuture<?> readyFuture, final long retryTime, final TimeUnit retryUnit) {
    Futures.addCallback(zkClient.exists(path, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (!isRunning()) {
          return;
        }
        if (event.getType() == Event.EventType.NodeCreated) {
          action.run();
        }
      }
    }), new FutureCallback<Stat>() {
      @Override
      public void onSuccess(Stat result) {
        if (result != null) {
          action.run();
        } else {
          // If the node doesn't exists, treat it as ready. When the node becomes available later, data will be
          // fetched by the watcher.
          readyFuture.set(null);
        }
      }

      @Override
      public void onFailure(Throwable t) {
        // Retry the operation based on the retry time.
        Thread retryThread = new Thread("zk-broker-service-retry") {
          @Override
          public void run() {
            try {
              retryUnit.sleep(retryTime);
              actOnExists(path, action, readyFuture, retryTime, retryUnit);
            } catch (InterruptedException e) {
              LOG.warn("ZK retry thread interrupted. Action not retried.");
            }
          }
        };
        retryThread.setDaemon(true);
        retryThread.start();
      }
    }, executorService);
  }