public synchronized void handleLiveInstanceChange()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [728:928]


  public synchronized void handleLiveInstanceChange(boolean onlyCheckOffline, boolean forceBalance)
      throws Exception {
    _lock.lock();
    try {
      LOGGER.info("handleLiveInstanceChange, onlyCheckOffline : {}, forceBalance : {}",
          onlyCheckOffline, forceBalance);

      // Check if any controller in route is down
      Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap = HelixUtils
          .getInstanceToTopicPartitionsMap(_helixManager,
              _kafkaValidationManager.getClusterToObserverMap());
      List<String> liveInstances = HelixUtils.liveInstances(_helixManager);
      List<String> instanceToReplace = new ArrayList<>();
      boolean routeControllerDown = false;
      // Check if any worker in route is down
      boolean routeWorkerDown = false;
      if (isAutoBalancingEnabled() || forceBalance) {
        for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
          if (!liveInstances.contains(instanceName)) {
            routeControllerDown = true;
            instanceToReplace.add(instanceName);
          }
        }

        LOGGER.info("Controller need to replace: {}", instanceToReplace);
        // Make sure controller status is up-to-date
        updateCurrentStatus();
        // Happy scenario: instance contains route topic
        for (String instance : instanceToReplace) {
          Set<TopicPartition> tpOrRouteSet = instanceToTopicPartitionsMap.get(instance);
          for (TopicPartition tpOrRoute : tpOrRouteSet) {
            if (tpOrRoute.getTopic().startsWith(SEPARATOR)) {
              String pipeline = tpOrRoute.getTopic();
              int routeId = tpOrRoute.getPartition();
              if (_availableControllerList.isEmpty()) {
                LOGGER
                    .warn("no available controller to process the route {}@{}", pipeline, routeId);
                break;
              }
              String newInstanceName = _availableControllerList.remove(0);
              LOGGER.info("Controller {} in route {}@{} will be replaced by {}", instance, pipeline,
                  routeId,
                  newInstanceName);
              InstanceTopicPartitionHolder newInstance = new InstanceTopicPartitionHolder(
                  newInstanceName, tpOrRoute);

              List<TopicPartition> tpToReassign = new ArrayList<>();
              PriorityQueue<InstanceTopicPartitionHolder> itphList = _pipelineToInstanceMap
                  .get(pipeline);
              for (InstanceTopicPartitionHolder itph : itphList) {
                if (itph.getInstanceName().equals(instance)) {
                  tpToReassign.addAll(itph.getServingTopicPartitionSet());
                  // TODO: is it possible to have different route on same host?
                  break;
                }
              }

              // Helix doesn't guarantee the order of execution, so we have to wait for new controller to be online
              // before reassigning topics
              // But this might cause long rebalance time
              _helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
                  IdealStateBuilder
                      .resetCustomIdealStateFor(
                          _helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
                          pipeline, String.valueOf(routeId), newInstanceName));

              long ts1 = System.currentTimeMillis();
              while (!isControllerOnline(newInstanceName, pipeline, String.valueOf(routeId))) {
                if (System.currentTimeMillis() - ts1 > 30000) {
                  break;
                }
                try {
                  // Based on testing, the wait time is usually in the order of 100 ms
                  Thread.sleep(100);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }

              long ts2 = System.currentTimeMillis();
              LOGGER.info("Controller {} in route {}@{} is replaced by {}, it took {} ms",
                  instance, pipeline, routeId, newInstanceName, ts2 - ts1);

              for (TopicPartition tp : tpToReassign) {
                _helixAdmin.setResourceIdealState(_helixClusterName, tp.getTopic(),
                    IdealStateBuilder
                        .resetCustomIdealStateFor(
                            _helixAdmin.getResourceIdealState(_helixClusterName, tp.getTopic()),
                            tp.getTopic(), pipeline + SEPARATOR + routeId, newInstanceName));
              }

              LOGGER.info(
                  "Controller {} in route {}@{} is replaced by {}, topics are reassigned, it took {} ms",
                  instance, pipeline, routeId, newInstanceName, System.currentTimeMillis() - ts2);
              break;
            }
          }
        }
        // Failure scenario: instance doesn't contain route topic
        // e.g. route and the topic in that route are not assigned to the same host
        // In this case, assume the instance of the route is correct and reassign the topic to that host
        for (String instance : instanceToTopicPartitionsMap.keySet()) {
          Set<TopicPartition> topicPartitionSet = instanceToTopicPartitionsMap.get(instance);
          if (topicPartitionSet.isEmpty()) {
            continue;
          }
          boolean foundRoute = false;
          for (TopicPartition tp : topicPartitionSet) {
            if (tp.getTopic().startsWith(SEPARATOR)) {
              foundRoute = true;
              break;
            }
          }
          if (!foundRoute) {
            routeControllerDown = true;
            String instanceForRoute = null;
            // Find the host for its route
            String route = topicPartitionSet.iterator().next().getPipeline();
            for (String pipeline : _pipelineToInstanceMap.keySet()) {
              if (pipeline.equals(getPipelineFromRoute(route))) {
                for (InstanceTopicPartitionHolder itph : _pipelineToInstanceMap.get(pipeline)) {
                  if (itph.getRouteString().equals(route)) {
                    instanceForRoute = itph.getInstanceName();
                    break;
                  }
                }
              }
            }

            LOGGER.info("Need to reassign: {} from {} to {}", topicPartitionSet, instance,
                instanceForRoute);
            for (TopicPartition tp : topicPartitionSet) {
              _helixAdmin.setResourceIdealState(_helixClusterName, tp.getTopic(),
                  IdealStateBuilder
                      .resetCustomIdealStateFor(
                          _helixAdmin.getResourceIdealState(_helixClusterName, tp.getTopic()),
                          tp.getTopic(), route, instanceForRoute));
            }
          }
        }

        if (routeControllerDown) {
          updateCurrentStatus();
        }

        HelixManager workeManager = _workerHelixManager.getHelixManager();
        Map<String, Set<TopicPartition>> workerInstanceToTopicPartitionsMap = HelixUtils
            .getInstanceToTopicPartitionsMap(workeManager, null);
        List<String> workerLiveInstances = HelixUtils.liveInstances(workeManager);
        Map<String, List<String>> workerPipelineToRouteIdToReplace = new HashMap<>();
        List<String> workerToReplace = new ArrayList<>();

        for (String instanceName : workerInstanceToTopicPartitionsMap.keySet()) {
          if (!workerLiveInstances.contains(instanceName)) {
            routeWorkerDown = true;
            TopicPartition route = workerInstanceToTopicPartitionsMap.get(instanceName).iterator()
                .next();
            workerPipelineToRouteIdToReplace.putIfAbsent(route.getTopic(), new ArrayList<>());
            workerPipelineToRouteIdToReplace.get(route.getTopic())
                .add(String.valueOf(route.getPartition()));
            workerToReplace.add(instanceName);
            LOGGER.info("Worker changed: {} for {}", instanceName, route);
          }
        }
        if (!routeWorkerDown) {
          LOGGER.info("No worker in route is changed, do nothing!");
        } else {
          LOGGER.info("Worker need to replace: {}, {}", workerToReplace,
              workerPipelineToRouteIdToReplace);
          // Make sure worker status is up-to-date
          if (!routeControllerDown) {
            updateCurrentStatus();
          }
          _workerHelixManager
              .replaceWorkerInMirrorMaker(workerPipelineToRouteIdToReplace, workerToReplace);

          updateCurrentStatus();
        }
      } else {
        LOGGER.info("AutoBalancing is disabled, do nothing");
      }

      if (onlyCheckOffline) {
        return;
      }

      LOGGER.info("Start rebalancing current cluster");
      // Haven't run updateCurrentStatus() before
      if (!routeControllerDown && !routeWorkerDown) {
        updateCurrentStatus();
      }
      if (isAutoScalingEnabled()) {
        scaleCurrentCluster();
      } else {
        LOGGER.info("AutoScaling is disabled, do nothing");
      }

    } finally {
      _lock.unlock();
    }
  }