public void process()

in tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java [424:466]


  public void process(WatchedEvent event) {
    State state = state();
    if (state == State.TERMINATED || state == State.FAILED) {
      return;
    }

    try {
      if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
        LOG.debug("Connected to ZooKeeper: {}", zkStr);
        notifyStarted();
        return;
      }
      if (event.getState() == Event.KeeperState.Expired) {
        LOG.info("ZooKeeper session expired: {}", zkStr);

        // When connection expired, simply reconnect again
        if (state != State.RUNNING) {
          return;
        }
        eventExecutor.submit(new Runnable() {
          @Override
          public void run() {
            // Only reconnect if the current state is running
            if (state() != State.RUNNING) {
              return;
            }
            try {
              LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
              closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
            } catch (IOException e) {
              notifyFailed(e);
            }
          }
        });
      }
    } finally {
      if (event.getType() == Event.EventType.None) {
        for (Watcher connectionWatcher : connectionWatchers) {
          connectionWatcher.process(event);
        }
      }
    }
  }