in twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java [462:504]
public void process(WatchedEvent event) {
State state = state();
if (state == State.TERMINATED || state == State.FAILED) {
return;
}
try {
if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
LOG.debug("Connected to ZooKeeper: {}", zkStr);
notifyStarted();
return;
}
if (event.getState() == Event.KeeperState.Expired) {
LOG.info("ZooKeeper session expired: {}", zkStr);
// When connection expired, simply reconnect again
if (state != State.RUNNING) {
return;
}
eventExecutor.submit(new Runnable() {
@Override
public void run() {
// Only reconnect if the current state is running
if (state() != State.RUNNING) {
return;
}
try {
LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
} catch (IOException e) {
notifyFailed(e);
}
}
});
}
} finally {
if (event.getType() == Event.EventType.None) {
for (Watcher connectionWatcher : connectionWatchers) {
connectionWatcher.process(event);
}
}
}
}