in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/WorkerHelixManager.java [96:140]
public synchronized void updateCurrentStatus() {
LOGGER.info("Trying to run worker updateCurrentStatus");
_lock.lock();
try {
Map<TopicPartition, List<String>> currRouteToInstanceMap = new HashMap<>();
List<String> currAvailableWorkerList = new ArrayList<>();
Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap = HelixUtils
.getInstanceToTopicPartitionsMap(_helixManager);
LOGGER.debug("For worker instanceToTopicPartitionsMap: {}", instanceToTopicPartitionsMap);
List<String> liveInstances = HelixUtils.liveInstances(_helixManager);
currAvailableWorkerList.addAll(liveInstances);
Map<String, List<TopicPartition>> incorrectInstanceToTopicPartitionsMap = new HashMap<>();
for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceName);
// TODO: one instance suppose to have only one partition
for (TopicPartition tp : topicPartitions) {
String pipeline = tp.getTopic();
if (pipeline.startsWith(SEPARATOR)) {
currRouteToInstanceMap.putIfAbsent(tp, new ArrayList<>());
currRouteToInstanceMap.get(tp).add(instanceName);
if (!currAvailableWorkerList.contains(instanceName)) {
LOGGER.info("not contain {} in {}@{}", instanceName, tp.getTopic(), tp.getPartition());
}
currAvailableWorkerList.remove(instanceName);
} else {
incorrectInstanceToTopicPartitionsMap.putIfAbsent(instanceName, new ArrayList<>());
incorrectInstanceToTopicPartitionsMap.get(instanceName).add(tp);
}
}
}
_routeToInstanceMap = currRouteToInstanceMap;
_availableWorkerList = currAvailableWorkerList;
if (!incorrectInstanceToTopicPartitionsMap.isEmpty()) {
LOGGER.error("Validate WRONG: wrong incorrectInstanceToTopicPartitionsMap: {}", incorrectInstanceToTopicPartitionsMap);
}
} finally {
_lock.unlock();
}
//LOGGER.info("For worker _routeToInstanceMap: {}", _routeToInstanceMap);
//LOGGER.info("For worker {} available, _availableWorkerList: {}", _availableWorkerList.size(), _availableWorkerList);
LOGGER.info("For worker {} available", _availableWorkerList.size());
}