private void validateInstanceToTopicPartitionsMap()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [248:477]


  private void validateInstanceToTopicPartitionsMap(
      Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap,
      Map<String, InstanceTopicPartitionHolder> instanceMap) {
    LOGGER.info("validateInstanceToTopicPartitionsMap()");
    int validateWrongCount = 0;
    int lowUrgencyValidateWrongCount = 0;
    for (String instanceId : instanceToTopicPartitionsMap.keySet()) {
      HostAndPort hostInfo = null;
      try {
        hostInfo = getHostInfo(instanceId);
      } catch (ControllerException ex) {
        LOGGER
            .error("Validate WRONG: Trying to get hostInfo for InstanceId: {} failed ", instanceId);
      }
      Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceId);
      Set<TopicPartition> routeSet = new HashSet<>();
      // TODO: one instance suppose to have only one route
      for (TopicPartition tp : topicPartitions) {
        String topicName = tp.getTopic();
        if (topicName.startsWith(SEPARATOR)) {
          routeSet.add(tp);
        }
      }

      if (routeSet.size() != 1) {
        Set<String> topicRouteSet = new HashSet<>();
        for (TopicPartition tp : topicPartitions) {
          String topicName = tp.getTopic();
          if (!topicName.startsWith(SEPARATOR)) {
            topicRouteSet.add(tp.getPipeline());
          }
        }
        validateWrongCount++;
        LOGGER.error(
            "Validate WRONG: Incorrect route found for hostInfo: {}, InstanceId: {}, route: {}, pipelines: {}, #workers: {}, worker: {}",
            hostInfo, instanceId, routeSet, topicRouteSet,
            instanceMap.get(instanceId).getWorkerSet().size(),
            instanceMap.get(instanceId).getWorkerSet());
      } else {
        int partitionCount = 0;
        Set<TopicPartition> mismatchTopicPartition = new HashSet<>();
        TopicPartition route = routeSet.iterator().next();
        String routeString = route.getTopic() + SEPARATOR + route.getPartition();
        for (TopicPartition tp : topicPartitions) {
          String topicName = tp.getTopic();
          if (!topicName.startsWith(SEPARATOR)) {
            partitionCount += tp.getPartition();
            if (!tp.getPipeline().equals(routeString)) {
              mismatchTopicPartition.add(tp);
            }
          }
        }
        if (mismatchTopicPartition.isEmpty() && hostInfo != null) {
          LOGGER.info(
              "Validate OK: hostInfo: {}, InstanceId: {}, route: {}, #topics: {}, #partitions: {}, #workers: {}, worker: {}",
              hostInfo, instanceId, routeSet,
              topicPartitions.size() - 1, partitionCount,
              instanceMap.get(instanceId).getWorkerSet().size(),
              instanceMap.get(instanceId).getWorkerSet());

          try {
            // try find topic mismatch between manager and controller
            String topicResult = HttpClientUtils.getData(_httpClient, _requestConfig,
                hostInfo.getHost(), hostInfo.getPort(), "/topics");
            LOGGER.debug("Get topics from {}: {}", hostInfo, topicResult);
            String rawTopicNames = topicResult;
            if (!rawTopicNames.equals("No topic is added in MirrorMaker Controller!")) {
              rawTopicNames = topicResult.substring(25, topicResult.length() - 1);
            }
            Set<String> controllerTopics = new HashSet<>();
            if (!rawTopicNames.equals("No topic is added in MirrorMaker Controller!")) {
              String[] topicNames = rawTopicNames.split(", ");
              for (String name : topicNames) {
                controllerTopics.add(name);
              }
            }

            Set<String> topicOnlyInManager = new HashSet<>();
            for (TopicPartition tp : topicPartitions) {
              if (!controllerTopics.contains(tp.getTopic())) {
                topicOnlyInManager.add(tp.getTopic());
              } else {
                controllerTopics.remove(tp.getTopic());
              }
            }

            if (topicOnlyInManager.size() > 1 || (topicOnlyInManager.size() == 1
                && !topicOnlyInManager.iterator().next().startsWith(SEPARATOR))) {
              validateWrongCount++;
              LOGGER.error(
                  "Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, topic only in manager: {}",
                  hostInfo, instanceId, routeSet, topicOnlyInManager);
            }

            if (!controllerTopics.isEmpty()) {
              validateWrongCount++;
              LOGGER.error(
                  "Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, topic only in controller: {}",
                  hostInfo, instanceId, routeSet, controllerTopics);
            }
          } catch (Exception e) {
            validateWrongCount++;
            LOGGER.error("Validate WRONG: Get topics error when connecting to {} for route {}",
                hostInfo, routeSet, e);
          }

          try {
            // try find worker mismatch between manager and controller
            String instanceResult = HttpClientUtils.getData(_httpClient, _requestConfig,
                hostInfo.getHost(), hostInfo.getPort(), "/instances");
            LOGGER.debug("Get workers from {}: {}", hostInfo, instanceResult);
            JSONObject instanceResultJson = JSON.parseObject(instanceResult);
            JSONArray allInstances = instanceResultJson.getJSONArray("allInstances");
            Set<String> controllerWorkers = new HashSet<>();
            for (Object instance : allInstances) {
              controllerWorkers.add(String.valueOf(instance));
            }

            Set<String> managerWorkers = instanceMap.get(instanceId).getWorkerSet();
            Set<String> workerOnlyInManager = new HashSet<>();
            for (String worker : managerWorkers) {
              if (!controllerWorkers.contains(worker)) {
                workerOnlyInManager.add(worker);
              } else {
                controllerWorkers.remove(worker);
              }
            }

            if (!workerOnlyInManager.isEmpty()) {
              lowUrgencyValidateWrongCount++;
              LOGGER.warn(
                  "Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, worker only in manager: {}",
                  hostInfo, instanceId, routeSet, workerOnlyInManager);
            }

            if (!controllerWorkers.isEmpty()) {
              validateWrongCount++;
              LOGGER.error(
                  "Validate WRONG: hostInfo: {}, InstanceId: {}, route: {}, worker only in controller: {}",
                  hostInfo, instanceId, routeSet, controllerWorkers);
            }
          } catch (Exception e) {
            validateWrongCount++;
            LOGGER.error("Validate WRONG: Get workers error when connecting to {} for route {}",
                hostInfo, routeSet, e);
          }

        } else if (hostInfo == null) {
          validateWrongCount++;
        } else {
          validateWrongCount++;
          LOGGER.error(
              "Validate WRONG: mismatch route found for hostInfo: {}, InstanceId: {}, route: {}, mismatch: {}, #workers: {}, worker: {}",
              hostInfo, instanceId, routeSet, mismatchTopicPartition,
              instanceMap.get(instanceId).getWorkerSet().size(),
              instanceMap.get(instanceId).getWorkerSet());
        }
      }
    }
    Map<String, Set<String>> topicToRouteMap = new HashMap<>();
    for (String instanceId : instanceToTopicPartitionsMap.keySet()) {
      Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceId);
      Set<TopicPartition> routeSet = new HashSet<>();
      // TODO: one instance suppose to have only one route
      for (TopicPartition tp : topicPartitions) {
        String topicName = tp.getTopic();
        if (topicName.startsWith(SEPARATOR)) {
          routeSet.add(tp);
        }
      }
      TopicPartition route = routeSet.iterator().next();
      String routeString = route.getTopic() + SEPARATOR + route.getPartition();
      for (TopicPartition tp : topicPartitions) {
        String topicName = tp.getTopic();
        if (!topicName.startsWith(SEPARATOR)) {
          if (!topicToRouteMap.containsKey(topicName)) {
            topicToRouteMap.put(topicName, new HashSet<>());
            topicToRouteMap.get(topicName).add(routeString);
          } else {
            Set<String> existingRouteSet = topicToRouteMap.get(topicName);
            Iterator<String> iter = existingRouteSet.iterator();
            while (iter.hasNext()) {
              String existingRoute = iter.next();
              if (existingRoute.split(SEPARATOR)[0].equals(routeString.split(SEPARATOR)[0])) {
                iter.remove();
              }
            }
            if (existingRouteSet.isEmpty()) {
              topicToRouteMap.remove(topicName);
            }
          }
        }
      }
    }
    LOGGER.info("Non-parity topicToRouteMap: {}", topicToRouteMap);
    if (_helixManager.isLeader()) {
      _nonParityTopic.inc(topicToRouteMap.size() - _nonParityTopic.getCount());
    }

    LOGGER.info("For controller _pipelineToInstanceMap:");
    Map<String, Set<String>> workerMap = new HashMap<>();
    for (String pipeline : _pipelineToInstanceMap.keySet()) {
      PriorityQueue<InstanceTopicPartitionHolder> itphSet = _pipelineToInstanceMap.get(pipeline);
      for (InstanceTopicPartitionHolder itph : itphSet) {
        Set<String> workers = itph.getWorkerSet();
        for (String worker : workers) {
          if (workerMap.containsKey(worker)) {
            workerMap.get(worker).add(itph.getRouteString());
          } else {
            Set<String> routeSet = new HashSet<>();
            routeSet.add(itph.getRouteString());
            workerMap.put(worker, routeSet);
          }
        }
      }
    }
    for (String worker : workerMap.keySet()) {
      if (workerMap.get(worker).size() != 1) {
        validateWrongCount++;
        LOGGER.error("Validate WRONG: wrong worker assignment for worker: {}, route: {}", worker,
            workerMap.get(worker));
      }
    }
    if (_helixManager.isLeader()) {
      _validateWrongCount.inc(validateWrongCount - _validateWrongCount.getCount());
      _lowUrgencyValidateWrongCount
          .inc(lowUrgencyValidateWrongCount - _lowUrgencyValidateWrongCount.getCount());
      updateMetrics(instanceToTopicPartitionsMap, instanceMap);
    }
  }