in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [248:477]
private void validateInstanceToTopicPartitionsMap(
Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap,
Map<String, InstanceTopicPartitionHolder> instanceMap) {
LOGGER.info("validateInstanceToTopicPartitionsMap()");
int validateWrongCount = 0;
int lowUrgencyValidateWrongCount = 0;
for (String instanceId : instanceToTopicPartitionsMap.keySet()) {
HostAndPort hostInfo = null;
try {
hostInfo = getHostInfo(instanceId);
} catch (ControllerException ex) {
LOGGER
.error("Validate WRONG: Trying to get hostInfo for InstanceId: {} failed ", instanceId);
}
Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceId);
Set<TopicPartition> routeSet = new HashSet<>();
// TODO: one instance suppose to have only one route
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (topicName.startsWith(SEPARATOR)) {
routeSet.add(tp);
}
}
if (routeSet.size() != 1) {
Set<String> topicRouteSet = new HashSet<>();
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (!topicName.startsWith(SEPARATOR)) {
topicRouteSet.add(tp.getPipeline());
}
}
validateWrongCount++;
LOGGER.error(
"Validate WRONG: Incorrect route found for hostInfo: {}, InstanceId: {}, route: {}, pipelines: {}, #workers: {}, worker: {}",
hostInfo, instanceId, routeSet, topicRouteSet,
instanceMap.get(instanceId).getWorkerSet().size(),
instanceMap.get(instanceId).getWorkerSet());
} else {
int partitionCount = 0;
Set<TopicPartition> mismatchTopicPartition = new HashSet<>();
TopicPartition route = routeSet.iterator().next();
String routeString = route.getTopic() + SEPARATOR + route.getPartition();
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (!topicName.startsWith(SEPARATOR)) {
partitionCount += tp.getPartition();
if (!tp.getPipeline().equals(routeString)) {
mismatchTopicPartition.add(tp);
}
}
}
if (mismatchTopicPartition.isEmpty() && hostInfo != null) {
LOGGER.info(
"Validate OK: hostInfo: {}, InstanceId: {}, route: {}, #topics: {}, #partitions: {}, #workers: {}, worker: {}",
hostInfo, instanceId, routeSet,
topicPartitions.size() - 1, partitionCount,
instanceMap.get(instanceId).getWorkerSet().size(),
instanceMap.get(instanceId).getWorkerSet());
try {
// try find topic mismatch between manager and controller
String topicResult = HttpClientUtils.getData(_httpClient, _requestConfig,
hostInfo.getHost(), hostInfo.getPort(), "/topics");
LOGGER.debug("Get topics from {}: {}", hostInfo, topicResult);
String rawTopicNames = topicResult;
if (!rawTopicNames.equals("No topic is added in MirrorMaker Controller!")) {
rawTopicNames = topicResult.substring(25, topicResult.length() - 1);
}
Set<String> controllerTopics = new HashSet<>();
if (!rawTopicNames.equals("No topic is added in MirrorMaker Controller!")) {
String[] topicNames = rawTopicNames.split(", ");
for (String name : topicNames) {
controllerTopics.add(name);
}
}
Set<String> topicOnlyInManager = new HashSet<>();
for (TopicPartition tp : topicPartitions) {
if (!controllerTopics.contains(tp.getTopic())) {
topicOnlyInManager.add(tp.getTopic());
} else {
controllerTopics.remove(tp.getTopic());
}
}
if (topicOnlyInManager.size() > 1 || (topicOnlyInManager.size() == 1
&& !topicOnlyInManager.iterator().next().startsWith(SEPARATOR))) {
validateWrongCount++;
LOGGER.error(
"Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, topic only in manager: {}",
hostInfo, instanceId, routeSet, topicOnlyInManager);
}
if (!controllerTopics.isEmpty()) {
validateWrongCount++;
LOGGER.error(
"Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, topic only in controller: {}",
hostInfo, instanceId, routeSet, controllerTopics);
}
} catch (Exception e) {
validateWrongCount++;
LOGGER.error("Validate WRONG: Get topics error when connecting to {} for route {}",
hostInfo, routeSet, e);
}
try {
// try find worker mismatch between manager and controller
String instanceResult = HttpClientUtils.getData(_httpClient, _requestConfig,
hostInfo.getHost(), hostInfo.getPort(), "/instances");
LOGGER.debug("Get workers from {}: {}", hostInfo, instanceResult);
JSONObject instanceResultJson = JSON.parseObject(instanceResult);
JSONArray allInstances = instanceResultJson.getJSONArray("allInstances");
Set<String> controllerWorkers = new HashSet<>();
for (Object instance : allInstances) {
controllerWorkers.add(String.valueOf(instance));
}
Set<String> managerWorkers = instanceMap.get(instanceId).getWorkerSet();
Set<String> workerOnlyInManager = new HashSet<>();
for (String worker : managerWorkers) {
if (!controllerWorkers.contains(worker)) {
workerOnlyInManager.add(worker);
} else {
controllerWorkers.remove(worker);
}
}
if (!workerOnlyInManager.isEmpty()) {
lowUrgencyValidateWrongCount++;
LOGGER.warn(
"Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, worker only in manager: {}",
hostInfo, instanceId, routeSet, workerOnlyInManager);
}
if (!controllerWorkers.isEmpty()) {
validateWrongCount++;
LOGGER.error(
"Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, worker only in controller: {}",
hostInfo, instanceId, routeSet, controllerWorkers);
}
} catch (Exception e) {
validateWrongCount++;
LOGGER.error("Validate WRONG: Get workers error when connecting to {} for route {}",
hostInfo, routeSet, e);
}
} else if (hostInfo == null) {
validateWrongCount++;
} else {
validateWrongCount++;
LOGGER.error(
"Validate WRONG: mismatch route found for hostInfo: {}, InstanceId: {}, route: {}, mismatch: {}, #workers: {}, worker: {}",
hostInfo, instanceId, routeSet, mismatchTopicPartition,
instanceMap.get(instanceId).getWorkerSet().size(),
instanceMap.get(instanceId).getWorkerSet());
}
}
}
Map<String, Set<String>> topicToRouteMap = new HashMap<>();
for (String instanceId : instanceToTopicPartitionsMap.keySet()) {
Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceId);
Set<TopicPartition> routeSet = new HashSet<>();
// TODO: one instance suppose to have only one route
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (topicName.startsWith(SEPARATOR)) {
routeSet.add(tp);
}
}
TopicPartition route = routeSet.iterator().next();
String routeString = route.getTopic() + SEPARATOR + route.getPartition();
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (!topicName.startsWith(SEPARATOR)) {
if (!topicToRouteMap.containsKey(topicName)) {
topicToRouteMap.put(topicName, new HashSet<>());
topicToRouteMap.get(topicName).add(routeString);
} else {
Set<String> existingRouteSet = topicToRouteMap.get(topicName);
Iterator<String> iter = existingRouteSet.iterator();
while (iter.hasNext()) {
String existingRoute = iter.next();
if (existingRoute.split(SEPARATOR)[0].equals(routeString.split(SEPARATOR)[0])) {
iter.remove();
}
}
if (existingRouteSet.isEmpty()) {
topicToRouteMap.remove(topicName);
}
}
}
}
}
LOGGER.info("Non-parity topicToRouteMap: {}", topicToRouteMap);
if (_helixManager.isLeader()) {
_nonParityTopic.inc(topicToRouteMap.size() - _nonParityTopic.getCount());
}
LOGGER.info("For controller _pipelineToInstanceMap:");
Map<String, Set<String>> workerMap = new HashMap<>();
for (String pipeline : _pipelineToInstanceMap.keySet()) {
PriorityQueue<InstanceTopicPartitionHolder> itphSet = _pipelineToInstanceMap.get(pipeline);
for (InstanceTopicPartitionHolder itph : itphSet) {
Set<String> workers = itph.getWorkerSet();
for (String worker : workers) {
if (workerMap.containsKey(worker)) {
workerMap.get(worker).add(itph.getRouteString());
} else {
Set<String> routeSet = new HashSet<>();
routeSet.add(itph.getRouteString());
workerMap.put(worker, routeSet);
}
}
}
}
for (String worker : workerMap.keySet()) {
if (workerMap.get(worker).size() != 1) {
validateWrongCount++;
LOGGER.error("Validate WRONG: wrong worker assignment for worker: {}, route: {}", worker,
workerMap.get(worker));
}
}
if (_helixManager.isLeader()) {
_validateWrongCount.inc(validateWrongCount - _validateWrongCount.getCount());
_lowUrgencyValidateWrongCount
.inc(lowUrgencyValidateWrongCount - _lowUrgencyValidateWrongCount.getCount());
updateMetrics(instanceToTopicPartitionsMap, instanceMap);
}
}