in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [387:463]
private Set<TopicPartition> moveStuckPartitions(Set<InstanceTopicPartitionHolder> instances) {
if (_maxStuckPartitionMovements <= 0 || _movePartitionAfterStuckMillis <= 0) {
return null;
}
Set<TopicPartition> allStuckPartitions = getStuckTopicPartitions();
if (allStuckPartitions.isEmpty()) {
_movePartitionHistoryMap.clear();
return null;
}
// clean up move history if the partition has progress
Iterator<Entry<TopicPartition, List<Long>>> iter = _movePartitionHistoryMap.entrySet()
.iterator();
while (iter.hasNext()) {
Entry<TopicPartition, List<Long>> entry = iter.next();
if (!allStuckPartitions.contains(entry.getKey())) {
iter.remove();
}
}
// find the corresponding stuck instances
Set<TopicPartition> stuckPartitionsToMove = new HashSet<>();
TreeSet<InstanceTopicPartitionHolder> nonStuckInstances = new TreeSet<>(
InstanceTopicPartitionHolder
.perPartitionWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever(),
null));
long now = System.currentTimeMillis();
for (InstanceTopicPartitionHolder itph : instances) {
boolean isStuckInstance = false;
for (TopicPartition tp : allStuckPartitions) {
if (itph.getServingTopicPartitionSet().contains(tp)) {
isStuckInstance = true;
List<Long> moveHistory = _movePartitionHistoryMap.get(tp);
if (moveHistory == null) {
moveHistory = new ArrayList<>();
_movePartitionHistoryMap.put(tp, moveHistory);
} else if (moveHistory.size() >= _maxStuckPartitionMovements) {
LOGGER.info("moveStuckPartitions: Skip moving stuck partition " + tp + " from "
+ itph.getInstanceName() + " because moving reaches upper bound of "
+ _maxStuckPartitionMovements);
continue;
} else if (!moveHistory.isEmpty()
&& moveHistory.get(moveHistory.size() - 1) + _movePartitionAfterStuckMillis > now) {
LOGGER.info("moveStuckPartitions: Skip moving stuck partition " + tp + " from "
+ itph.getInstanceName() + " because it was moved recently");
continue;
}
LOGGER.info("moveStuckPartitions: Trying to move stuck partition " + tp + " from " + itph
.getInstanceName());
moveHistory.add(now);
itph.removeTopicPartition(tp);
stuckPartitionsToMove.add(tp);
}
}
if (!isStuckInstance) {
nonStuckInstances.add(itph);
}
}
if (stuckPartitionsToMove.isEmpty()) {
LOGGER.info("moveStuckPartitions: No stuck partitions can be moved");
return null;
}
// try to move the partitions to non-stuck instances
if (nonStuckInstances.isEmpty()) {
// No non-stuck instance. Shuffle the stuck partitions across all
// instances but some partitions may still stay at the same instance
LOGGER.info(
"moveStuckPartitions: All instances are stuck. Shuffle the stuck partitions instead");
nonStuckInstances.addAll(instances);
}
assignPartitions(nonStuckInstances, new ArrayList<>(stuckPartitionsToMove));
return stuckPartitionsToMove;
}