in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [465:592]
private boolean balancePartitions(
Set<InstanceTopicPartitionHolder> instances,
List<TopicPartition> partitionsToBeAssigned,
Set<TopicPartition> blacklistedTopicPartitions,
boolean forced) {
if (instances.isEmpty()) {
LOGGER.error("No workers to take " + partitionsToBeAssigned.size() + " partitions");
return false;
}
boolean assignmentChanged = false;
Set<TopicPartition> stuckPartitionsMoved = moveStuckPartitions(instances);
if (stuckPartitionsMoved != null && !stuckPartitionsMoved.isEmpty()) {
assignmentChanged = true;
}
// collect lag information for all topic partitions
final Map<TopicPartition, Long> lagTimeMap = new HashMap<>();
List<TopicPartition> laggingPartitions = new ArrayList<>();
List<TopicPartition> nonLaggingPartitions = new ArrayList<>();
for (TopicPartition tp : partitionsToBeAssigned) {
if (blacklistedTopicPartitions.contains(tp)) {
LOGGER.info("%s in blacklist, skip assignment", tp.toString());
continue;
}
TopicPartitionLag lagInfo = _helixMirrorMakerManager.calculateLagTime(tp);
if (lagInfo != null && lagInfo.getLag() > 0) {
lagTimeMap.put(tp, lagInfo.getLag());
laggingPartitions.add(tp);
} else {
nonLaggingPartitions.add(tp);
}
}
for (InstanceTopicPartitionHolder instance : instances) {
for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
TopicPartitionLag lagInfo = _helixMirrorMakerManager.calculateLagTime(tp);
if (lagInfo != null && lagInfo.getLag() > 0) {
lagTimeMap.put(tp, lagInfo.getLag());
}
}
}
LOGGER.info("balancePartitions: Current lagging partitions: " + lagTimeMap);
// re-distribute the lagging partitions first
ITopicWorkloadWeighter laggingPartitionWeighter = (TopicPartition tp) ->
lagTimeMap.containsKey(tp) ? 1.0 : 0.0;
TreeSet<InstanceTopicPartitionHolder> instancesSortedByLag = new TreeSet<>(
InstanceTopicPartitionHolder
.perPartitionWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever(),
laggingPartitionWeighter));
instancesSortedByLag.addAll(instances);
List<TopicPartition> reassignedLaggingPartitions = removeOverloadedParitions(
instancesSortedByLag,
laggingPartitions, null, true, laggingPartitionWeighter);
laggingPartitions.addAll(reassignedLaggingPartitions);
if (assignPartitions(instancesSortedByLag, laggingPartitions)) {
assignmentChanged = true;
}
// dedicated instances serve only lagging partitions
int maxDedicated = (int) (instances.size() * _maxDedicatedInstancesRatio);
TreeSet<InstanceTopicPartitionHolder> orderedInstances = new TreeSet<>(
InstanceTopicPartitionHolder
.perPartitionWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever(),
null));
// instancesSortedByLag are sorted by lags, so the instances with lags appear after the instances with non-lags only
List<InstanceTopicPartitionHolder> dedicatedInstances = new ArrayList<>();
for (InstanceTopicPartitionHolder instance : instancesSortedByLag) {
if (dedicatedInstances.size() > maxDedicated) {
orderedInstances.add(instance);
} else {
boolean hasLag = false;
for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
if (lagTimeMap.containsKey(tp)) {
hasLag = true;
break;
}
}
if (hasLag) {
// this instance is a dedicated one for lagging partitions only
dedicatedInstances.add(instance);
for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
if (!lagTimeMap.containsKey(tp)) {
instance.removeTopicPartition(tp);
nonLaggingPartitions.add(tp);
}
}
} else {
orderedInstances.add(instance);
}
}
}
if (!dedicatedInstances.isEmpty()) {
LOGGER.info("balancePartitions: dedicated instances: " + dedicatedInstances);
}
// if the current assignment is overloaded on some workers, re-assign some partitions of these workers
ITopicWorkloadWeighter adjustedWeighter = new ITopicWorkloadWeighter() {
@Override
public double partitionWeight(TopicPartition tp) {
// give 1.0 more weight for each minute lag up to 2 hour
Long lag = lagTimeMap.get(tp);
if (lag == null) {
return 1.0;
}
return 1.0 + Math.min(120, lag / 60);
}
};
Set<TopicPartition> pinnedPartitions = new HashSet<>();
pinnedPartitions.addAll(lagTimeMap.keySet());
if (stuckPartitionsMoved != null) {
pinnedPartitions.addAll(stuckPartitionsMoved);
}
List<TopicPartition> reassignedPartitions = removeOverloadedParitions(orderedInstances,
nonLaggingPartitions, pinnedPartitions, forced, adjustedWeighter);
nonLaggingPartitions.addAll(reassignedPartitions);
if (assignPartitions(orderedInstances, nonLaggingPartitions)) {
assignmentChanged = true;
}
return assignmentChanged;
}