in twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java [274:317]
private Watcher createConnectionWatcher() {
return new Watcher() {
// Watcher is invoked from single event thread, hence safe to use normal mutable variable.
private boolean expired;
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
expired = true;
} else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
expired = false;
// Re-register all services
lock.lock();
try {
for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
LOG.info("Re-registering service: {}", entry.getKey());
// Must be non-blocking in here.
Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
// Updates the cancellable to the newly created sequential node.
entry.getValue().setPath(result);
LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
}
@Override
public void onFailure(Throwable t) {
// When failed to create the node, there would be no retry and simply make the cancellable do nothing.
entry.getValue().setPath(null);
LOG.error("Failed to re-register service: {}", entry.getKey(), t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
} finally {
lock.unlock();
}
}
}
};
}