in helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java [1360:1447]
protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
// construct maps for current live-instances
Map<String, LiveInstance> curInstances = new HashMap<>();
Map<String, LiveInstance> curSessions = new HashMap<>();
for (LiveInstance liveInstance : liveInstances) {
curInstances.put(liveInstance.getInstanceName(), liveInstance);
curSessions.put(liveInstance.getEphemeralOwner(), liveInstance);
}
// TODO: remove the synchronization here once we move this update into dataCache.
synchronized (_lastSeenInstances) {
Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
HelixManager manager = changeContext.getManager();
Builder keyBuilder = new Builder(manager.getClusterName());
if (lastSessions != null) {
for (String session : lastSessions.keySet()) {
if (!curSessions.containsKey(session)) {
// remove current-state listener for expired session
String instanceName = lastSessions.get(session).getInstanceName();
manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
manager.removeListener(keyBuilder.taskCurrentStates(instanceName, session), this);
}
}
}
if (lastInstances != null) {
for (String instance : lastInstances.keySet()) {
if (!curInstances.containsKey(instance)) {
// remove message listener for disconnected instances
manager.removeListener(keyBuilder.messages(instance), this);
// remove customized state root listener for disconnected instances
manager.removeListener(keyBuilder.customizedStatesRoot(instance), this);
}
}
}
for (String session : curSessions.keySet()) {
if (lastSessions == null || !lastSessions.containsKey(session)) {
String instanceName = curSessions.get(session).getInstanceName();
try {
// add current-state listeners for new sessions
manager.addCurrentStateChangeListener(this, instanceName, session);
manager.addTaskCurrentStateChangeListener(this, instanceName, session);
logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+ instanceName + ", session: " + session + ", listener: " + this);
} catch (Exception e) {
logger.error("Fail to add current state listener for instance: " + instanceName
+ " with session: " + session, e);
}
}
}
for (String instance : curInstances.keySet()) {
if (lastInstances == null || !lastInstances.containsKey(instance)) {
try {
// add message listeners for new instances
manager.addMessageListener(this, instance);
logger.info(manager.getInstanceName() + " added message listener for " + instance
+ ", listener: " + this);
} catch (Exception e) {
logger.error("Fail to add message listener for instance: " + instance, e);
}
}
}
for (String instance : curInstances.keySet()) {
if (lastInstances == null || !lastInstances.containsKey(instance)) {
try {
manager.addCustomizedStateRootChangeListener(this, instance);
logger.info(manager.getInstanceName() + " added root path listener for customized "
+ "state change for " + instance + ", listener: " + this);
} catch (Exception e) {
logger.error(
"Fail to add root path listener for customized state change for instance: "
+ instance, e);
}
}
}
// update last-seen
_lastSeenInstances.set(curInstances);
_lastSeenSessions.set(curSessions);
}
}