in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [642:753]
private List<TopicPartition> removeOverloadedParitions(
TreeSet<InstanceTopicPartitionHolder> orderedInstances,
List<TopicPartition> partitionsToBeAssigned, Set<TopicPartition> pinnedPartitions,
boolean forced,
ITopicWorkloadWeighter weighter) {
List<TopicPartition> overloaded = new ArrayList<>();
WorkloadInfoRetriever retriever = _helixMirrorMakerManager.getWorkloadInfoRetriever();
TopicWorkload totalWorkload = new TopicWorkload(0, 0, 0);
boolean noPartitions = true;
for (TopicPartition tp : partitionsToBeAssigned) {
double weight = weighter.partitionWeight(tp);
// weight == 0 means the partition is not considered for workload
if (weight > 0) {
TopicWorkload tw = retriever.topicWorkload(tp.getTopic());
totalWorkload.add(weight * tw.getBytesPerSecondPerPartition(),
weight * tw.getMsgsPerSecondPerPartition());
noPartitions = false;
}
}
// instancesPartitionsCount count how many partitions are considered for workload for each instance
Map<String, Integer> instancesPartitionsCount = new HashMap<>();
for (InstanceTopicPartitionHolder instance : orderedInstances) {
int partitionCount = 0;
for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
double weight = weighter.partitionWeight(tp);
if (weight > 0) {
partitionCount++;
TopicWorkload tw = retriever.topicWorkload(tp.getTopic());
totalWorkload.add(weight * tw.getBytesPerSecondPerPartition(),
weight * tw.getMsgsPerSecondPerPartition());
noPartitions = false;
}
}
instancesPartitionsCount.put(instance.getInstanceName(), partitionCount);
}
if (noPartitions) {
return overloaded;
}
TopicWorkload averageWorkload = new TopicWorkload(
totalWorkload.getBytesPerSecond() / orderedInstances.size(),
totalWorkload.getMsgsPerSecond() / orderedInstances.size());
// adjust average by excluding the instances that have a single partition but exceeds the average workload
// because the workload cannot be further divided to multiple workers
int excludeInstances = 0;
for (InstanceTopicPartitionHolder instance : orderedInstances) {
if (instancesPartitionsCount.get(instance.getInstanceName()) == 1) {
for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
if (weighter.partitionWeight(tp) > 0) {
double weight = weighter.partitionWeight(tp);
TopicWorkload tw = retriever.topicWorkload(tp.getTopic());
if (tw.compareTotal(averageWorkload) > 0) {
excludeInstances++;
totalWorkload.setMsgsPerSecond(
totalWorkload.getMsgsPerSecond() - weight * tw.getMsgsPerSecondPerPartition());
totalWorkload.setBytesPerSecond(
totalWorkload.getBytesPerSecond() - weight * tw.getBytesPerSecondPerPartition());
}
break;
}
}
}
}
int numInstances = orderedInstances.size() - excludeInstances;
if (numInstances > 0) {
averageWorkload = new TopicWorkload(totalWorkload.getBytesPerSecond() / numInstances,
totalWorkload.getMsgsPerSecond() / numInstances);
}
TopicWorkload maxWorkload = (forced && _overloadedRatioThreshold > 1.0) ? averageWorkload
: new TopicWorkload(averageWorkload.getBytesPerSecond() * _overloadedRatioThreshold,
averageWorkload.getMsgsPerSecond() * _overloadedRatioThreshold);
List<InstanceTopicPartitionHolder> processedInstances = new ArrayList<>();
while (!orderedInstances.isEmpty()) {
InstanceTopicPartitionHolder highest = orderedInstances.pollLast();
processedInstances.add(highest);
if (instancesPartitionsCount.get(highest.getInstanceName()) <= 1) {
// no need to rebalance to other worker because it is a single or no partition
continue;
}
TopicWorkload workerWorkload = highest.totalWorkload(retriever, weighter);
if (workerWorkload.compareTotal(maxWorkload) <= 0) {
break;
}
TopicWorkload diff = new TopicWorkload(
workerWorkload.getBytesPerSecond() - maxWorkload.getBytesPerSecond(),
workerWorkload.getMsgsPerSecond() - maxWorkload.getMsgsPerSecond());
TopicWorkload workloadToRemove = new TopicWorkload(0, 0, 0);
List<TopicPartition> partitions = new ArrayList<>(highest.getServingTopicPartitionSet());
Collections.shuffle(partitions); // choose random partitions
for (TopicPartition tp : partitions) {
double weight = weighter.partitionWeight(tp);
if (weight == 0 || pinnedPartitions != null && pinnedPartitions.contains(tp)) {
continue;
}
TopicWorkload tpWorkload = retriever.topicWorkload(tp.getTopic());
workloadToRemove.add(weight * tpWorkload.getBytesPerSecondPerPartition(),
weight * tpWorkload.getMsgsPerSecondPerPartition());
highest.removeTopicPartition(tp);
overloaded.add(tp);
if (workloadToRemove.compareTotal(diff) >= 0) {
break;
}
}
}
// re-sort the list after removing partitions
orderedInstances.addAll(processedInstances);
return overloaded;
}