private Watcher createConnectionWatcher()

in twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java [274:317]


  private Watcher createConnectionWatcher() {
    return new Watcher() {
      // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
      private boolean expired;

      @Override
      public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.Expired) {
          LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
          expired = true;
        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
          LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
          expired = false;

          // Re-register all services
          lock.lock();
          try {
            for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
              LOG.info("Re-registering service: {}", entry.getKey());

              // Must be non-blocking in here.
              Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                  // Updates the cancellable to the newly created sequential node.
                  entry.getValue().setPath(result);
                  LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
                }

                @Override
                public void onFailure(Throwable t) {
                  // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
                  entry.getValue().setPath(null);
                  LOG.error("Failed to re-register service: {}", entry.getKey(), t);
                }
              }, Threads.SAME_THREAD_EXECUTOR);
            }
          } finally {
            lock.unlock();
          }
        }
      }
    };
  }