in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [594:630]
private boolean assignPartitions(TreeSet<InstanceTopicPartitionHolder> orderedInstances,
List<TopicPartition> partitionsToBeAssigned) {
if (orderedInstances.isEmpty() || partitionsToBeAssigned.isEmpty()) {
return false;
}
// sort partitions based on workload in reverse order (high -> low)
Collections.sort(partitionsToBeAssigned,
Collections.reverseOrder(
TopicPartition
.getWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever())));
// assign partitions of the same topic to different workers if possible
List<TopicPartition> sameTopic = new ArrayList<>();
List<InstanceTopicPartitionHolder> lowestInstances = new ArrayList<>();
for (int i = 0; i < partitionsToBeAssigned.size(); ) {
sameTopic.clear();
lowestInstances.clear();
TopicPartition tp = partitionsToBeAssigned.get(i);
sameTopic.add(tp);
i++;
while (i < partitionsToBeAssigned.size() && partitionsToBeAssigned.get(i).getTopic()
.equals(tp.getTopic())) {
sameTopic.add(partitionsToBeAssigned.get(i));
i++;
}
while (!orderedInstances.isEmpty() && lowestInstances.size() < sameTopic.size()) {
lowestInstances.add(orderedInstances.pollFirst());
}
for (int j = 0; j < sameTopic.size(); j++) {
lowestInstances.get(j % lowestInstances.size()).addTopicPartition(sameTopic.get(j));
}
orderedInstances.addAll(lowestInstances);
}
return true;
}