protected void checkLiveInstancesObservation()

in helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java [1360:1447]


  protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
      NotificationContext changeContext) {

    // construct maps for current live-instances
    Map<String, LiveInstance> curInstances = new HashMap<>();
    Map<String, LiveInstance> curSessions = new HashMap<>();
    for (LiveInstance liveInstance : liveInstances) {
      curInstances.put(liveInstance.getInstanceName(), liveInstance);
      curSessions.put(liveInstance.getEphemeralOwner(), liveInstance);
    }

    // TODO: remove the synchronization here once we move this update into dataCache.
    synchronized (_lastSeenInstances) {
      Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
      Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();

      HelixManager manager = changeContext.getManager();
      Builder keyBuilder = new Builder(manager.getClusterName());
      if (lastSessions != null) {
        for (String session : lastSessions.keySet()) {
          if (!curSessions.containsKey(session)) {
            // remove current-state listener for expired session
            String instanceName = lastSessions.get(session).getInstanceName();
            manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
            manager.removeListener(keyBuilder.taskCurrentStates(instanceName, session), this);
          }
        }
      }

      if (lastInstances != null) {
        for (String instance : lastInstances.keySet()) {
          if (!curInstances.containsKey(instance)) {
            // remove message listener for disconnected instances
            manager.removeListener(keyBuilder.messages(instance), this);
            // remove customized state root listener for disconnected instances
            manager.removeListener(keyBuilder.customizedStatesRoot(instance), this);
          }
        }
      }

      for (String session : curSessions.keySet()) {
        if (lastSessions == null || !lastSessions.containsKey(session)) {
          String instanceName = curSessions.get(session).getInstanceName();
          try {
            // add current-state listeners for new sessions
            manager.addCurrentStateChangeListener(this, instanceName, session);
            manager.addTaskCurrentStateChangeListener(this, instanceName, session);
            logger.info(manager.getInstanceName() + " added current-state listener for instance: "
                + instanceName + ", session: " + session + ", listener: " + this);
          } catch (Exception e) {
            logger.error("Fail to add current state listener for instance: " + instanceName
                + " with session: " + session, e);
          }
        }
      }

      for (String instance : curInstances.keySet()) {
        if (lastInstances == null || !lastInstances.containsKey(instance)) {
          try {
            // add message listeners for new instances
            manager.addMessageListener(this, instance);
            logger.info(manager.getInstanceName() + " added message listener for " + instance
                + ", listener: " + this);
          } catch (Exception e) {
            logger.error("Fail to add message listener for instance: " + instance, e);
          }
        }
      }

        for (String instance : curInstances.keySet()) {
          if (lastInstances == null || !lastInstances.containsKey(instance)) {
            try {
              manager.addCustomizedStateRootChangeListener(this, instance);
              logger.info(manager.getInstanceName() + " added root path listener for customized "
                  + "state change for " + instance + ", listener: " + this);
            } catch (Exception e) {
              logger.error(
                  "Fail to add root path listener for customized state change for instance: "
                      + instance, e);
            }
        }
      }

      // update last-seen
      _lastSeenInstances.set(curInstances);
      _lastSeenSessions.set(curSessions);
    }
  }