public synchronized void updateCurrentStatus()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/WorkerHelixManager.java [96:140]


  public synchronized void updateCurrentStatus() {
    LOGGER.info("Trying to run worker updateCurrentStatus");
    _lock.lock();
    try {
      Map<TopicPartition, List<String>> currRouteToInstanceMap = new HashMap<>();
      List<String> currAvailableWorkerList = new ArrayList<>();

      Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap = HelixUtils
          .getInstanceToTopicPartitionsMap(_helixManager);
      LOGGER.debug("For worker instanceToTopicPartitionsMap: {}", instanceToTopicPartitionsMap);
      List<String> liveInstances = HelixUtils.liveInstances(_helixManager);
      currAvailableWorkerList.addAll(liveInstances);

      Map<String, List<TopicPartition>> incorrectInstanceToTopicPartitionsMap = new HashMap<>();
      for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
        Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceName);
        // TODO: one instance suppose to have only one partition
        for (TopicPartition tp : topicPartitions) {
          String pipeline = tp.getTopic();
          if (pipeline.startsWith(SEPARATOR)) {
            currRouteToInstanceMap.putIfAbsent(tp, new ArrayList<>());
            currRouteToInstanceMap.get(tp).add(instanceName);
            if (!currAvailableWorkerList.contains(instanceName)) {
              LOGGER.info("not contain {} in {}@{}", instanceName, tp.getTopic(), tp.getPartition());
            }
            currAvailableWorkerList.remove(instanceName);
          } else {
            incorrectInstanceToTopicPartitionsMap.putIfAbsent(instanceName, new ArrayList<>());
            incorrectInstanceToTopicPartitionsMap.get(instanceName).add(tp);
          }
        }
      }
      _routeToInstanceMap = currRouteToInstanceMap;
      _availableWorkerList = currAvailableWorkerList;
      if (!incorrectInstanceToTopicPartitionsMap.isEmpty()) {
        LOGGER.error("Validate WRONG: wrong incorrectInstanceToTopicPartitionsMap: {}", incorrectInstanceToTopicPartitionsMap);
      }
    } finally {
      _lock.unlock();
    }

    //LOGGER.info("For worker _routeToInstanceMap: {}", _routeToInstanceMap);
    //LOGGER.info("For worker {} available, _availableWorkerList: {}", _availableWorkerList.size(), _availableWorkerList);
    LOGGER.info("For worker {} available", _availableWorkerList.size());
  }