private Set moveStuckPartitions()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [387:463]


  private Set<TopicPartition> moveStuckPartitions(Set<InstanceTopicPartitionHolder> instances) {
    if (_maxStuckPartitionMovements <= 0 || _movePartitionAfterStuckMillis <= 0) {
      return null;
    }

    Set<TopicPartition> allStuckPartitions = getStuckTopicPartitions();
    if (allStuckPartitions.isEmpty()) {
      _movePartitionHistoryMap.clear();
      return null;
    }

    // clean up move history if the partition has progress
    Iterator<Entry<TopicPartition, List<Long>>> iter = _movePartitionHistoryMap.entrySet()
        .iterator();
    while (iter.hasNext()) {
      Entry<TopicPartition, List<Long>> entry = iter.next();
      if (!allStuckPartitions.contains(entry.getKey())) {
        iter.remove();
      }
    }

    // find the corresponding stuck instances
    Set<TopicPartition> stuckPartitionsToMove = new HashSet<>();
    TreeSet<InstanceTopicPartitionHolder> nonStuckInstances = new TreeSet<>(
        InstanceTopicPartitionHolder
            .perPartitionWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever(),
                null));
    long now = System.currentTimeMillis();
    for (InstanceTopicPartitionHolder itph : instances) {
      boolean isStuckInstance = false;
      for (TopicPartition tp : allStuckPartitions) {
        if (itph.getServingTopicPartitionSet().contains(tp)) {
          isStuckInstance = true;
          List<Long> moveHistory = _movePartitionHistoryMap.get(tp);
          if (moveHistory == null) {
            moveHistory = new ArrayList<>();
            _movePartitionHistoryMap.put(tp, moveHistory);
          } else if (moveHistory.size() >= _maxStuckPartitionMovements) {
            LOGGER.info("moveStuckPartitions: Skip moving stuck partition " + tp + " from "
                + itph.getInstanceName() + " because moving reaches upper bound of "
                + _maxStuckPartitionMovements);
            continue;
          } else if (!moveHistory.isEmpty()
              && moveHistory.get(moveHistory.size() - 1) + _movePartitionAfterStuckMillis > now) {
            LOGGER.info("moveStuckPartitions: Skip moving stuck partition " + tp + " from "
                + itph.getInstanceName() + " because it was moved recently");
            continue;
          }
          LOGGER.info("moveStuckPartitions: Trying to move stuck partition " + tp + " from " + itph
              .getInstanceName());
          moveHistory.add(now);
          itph.removeTopicPartition(tp);
          stuckPartitionsToMove.add(tp);
        }
      }
      if (!isStuckInstance) {
        nonStuckInstances.add(itph);
      }
    }

    if (stuckPartitionsToMove.isEmpty()) {
      LOGGER.info("moveStuckPartitions: No stuck partitions can be moved");
      return null;
    }

    // try to move the partitions to non-stuck instances
    if (nonStuckInstances.isEmpty()) {
      // No non-stuck instance. Shuffle the stuck partitions across all
      // instances but some partitions may still stay at the same instance
      LOGGER.info(
          "moveStuckPartitions: All instances are stuck. Shuffle the stuck partitions instead");
      nonStuckInstances.addAll(instances);
    }
    assignPartitions(nonStuckInstances, new ArrayList<>(stuckPartitionsToMove));

    return stuckPartitionsToMove;
  }