public synchronized void updateCurrentServingInstance()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java [165:197]


  public synchronized void updateCurrentServingInstance() {
    synchronized (_currentServingInstance) {
      Map<String, InstanceTopicPartitionHolder> instanceMap = new HashMap<>();
      Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap =
          HelixUtils.getInstanceToTopicPartitionsMap(_helixZkManager);
      List<String> liveInstances = HelixUtils.liveInstances(_helixZkManager);
      Set<String> blacklistedInstances = new HashSet<>(getBlacklistedInstances());
      for (String instanceName : liveInstances) {
        if (!blacklistedInstances.contains(instanceName)) {
          InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(instanceName);
          instanceMap.put(instanceName, instance);
        }
      }
      for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
        if (instanceMap.containsKey(instanceName)) {
          instanceMap.get(instanceName)
              .addTopicPartitions(instanceToTopicPartitionsMap.get(instanceName));
        }
      }
      _currentServingInstance.clear();
      int maxStandbyHosts = (_controllerConf.getMaxWorkingInstances() <= 0) ? 0
          : instanceMap.size() - _controllerConf.getMaxWorkingInstances();
      int standbyHosts = 0;
      for (InstanceTopicPartitionHolder itph : instanceMap.values()) {
        if (standbyHosts >= maxStandbyHosts || itph.getNumServingTopicPartitions() > 0) {
          _currentServingInstance.add(itph);
        } else {
          // exclude it as a standby host
          standbyHosts++;
        }
      }
    }
  }