in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [254:369]
private Set<InstanceTopicPartitionHolder> rescaleInstanceToTopicPartitionMap(
List<LiveInstance> liveInstances,
List<String> excludedInstances,
Map<String, Set<TopicPartition>> instanceToTopicPartitionMap,
Set<TopicPartition> unassignedTopicPartitions,
Set<TopicPartition> blacklistedTopicPartitions,
boolean checkInstanceChange,
boolean forced) {
Set<String> liveInstanceNames = getLiveInstanceName(liveInstances, excludedInstances);
// removedInstances contains the excludedInstances that currently have workload too
Set<String> removedInstances = getRemovedInstanceSet(liveInstanceNames,
instanceToTopicPartitionMap.keySet());
Set<String> idleInstances = getIdleInstanceSet(liveInstanceNames, instanceToTopicPartitionMap);
int numIdleInstancesToAssign;
if (_maxWorkingInstances == 0) {
numIdleInstancesToAssign = idleInstances.size();
} else {
numIdleInstancesToAssign =
_maxWorkingInstances - (instanceToTopicPartitionMap.size() - removedInstances.size());
if (numIdleInstancesToAssign < 0) {
numIdleInstancesToAssign = 0;
} else if (numIdleInstancesToAssign > idleInstances.size()) {
numIdleInstancesToAssign = idleInstances.size();
}
}
if (numIdleInstancesToAssign > 0) {
_numIdleInstances
.inc(idleInstances.size() - numIdleInstancesToAssign - _numIdleInstances.getCount());
} else {
_numIdleInstances.inc(idleInstances.size() - _numIdleInstances.getCount());
}
if (!forced && numIdleInstancesToAssign <= 0 && checkInstanceChange && removedInstances
.isEmpty()
&& (_maxWorkingInstances <= 0 || _maxWorkingInstances >= instanceToTopicPartitionMap
.size())) {
// no instance change
return null;
}
LOGGER.info("Trying to rescale cluster with " + liveInstanceNames.size() + " live instances ("
+ excludedInstances.size() + " excluded - " + excludedInstances + "), using "
+ numIdleInstancesToAssign
+ " out of " + idleInstances.size() + " idle instances, and removed " + removedInstances
.size()
+ " instances - " + Arrays.toString(removedInstances.toArray(new String[0]))
+ " for unassigned partitions: "
+ unassignedTopicPartitions);
Set<InstanceTopicPartitionHolder> instances = new HashSet<>();
List<TopicPartition> tpiNeedsToBeAssigned = new ArrayList<>();
tpiNeedsToBeAssigned.addAll(unassignedTopicPartitions);
boolean assignmentChanged = false;
for (Map.Entry<String, Set<TopicPartition>> entry : instanceToTopicPartitionMap.entrySet()) {
String instanceName = entry.getKey();
if (entry.getValue().isEmpty()) {
// it is an idle instance, do nothing
} else if (!removedInstances.contains(instanceName)) {
// keep the instance assignment as it
InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(instanceName);
instance.addTopicPartitions(entry.getValue());
instances.add(instance);
} else if (!idleInstances.isEmpty() && numIdleInstancesToAssign > 0) {
// assign all the workload to another idle instance
String idleInstanceToAssign = idleInstances.iterator().next();
idleInstances.remove(idleInstanceToAssign);
numIdleInstancesToAssign--;
InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(
idleInstanceToAssign);
instance.addTopicPartitions(entry.getValue());
instances.add(instance);
assignmentChanged = true;
LOGGER.info(
"Move workload from instance " + instanceName + " to " + idleInstanceToAssign + ": "
+ entry.getValue());
} else {
// assign the workload to all instances
tpiNeedsToBeAssigned.addAll(entry.getValue());
}
}
if (_maxWorkingInstances > 0 && instances.size() > _maxWorkingInstances) {
// there are working instances more than the expected number, move them to idle pool
Iterator<InstanceTopicPartitionHolder> iter = instances.iterator();
while (instances.size() > _maxWorkingInstances && iter.hasNext()) {
InstanceTopicPartitionHolder itph = iter.next();
LOGGER.info("Move workload from instance " + itph.getInstanceName() + " to become idle: "
+ itph.getServingTopicPartitionSet());
tpiNeedsToBeAssigned.addAll(itph.getServingTopicPartitionSet());
iter.remove();
}
} else if (numIdleInstancesToAssign > 0) {
// put more idle instances to the list for rebalancing
for (String instanceName : idleInstances) {
if (numIdleInstancesToAssign <= 0) {
break;
}
instances.add(new InstanceTopicPartitionHolder(instanceName));
forced = true; // force to rebalance so as to assign partitions to the idle instances
numIdleInstancesToAssign--;
}
}
if (balancePartitions(instances, tpiNeedsToBeAssigned, blacklistedTopicPartitions, forced)) {
assignmentChanged = true;
}
// add empty task for the removed/excluded instances
for (String ri : removedInstances) {
instances.add(new InstanceTopicPartitionHolder(ri));
assignmentChanged = true;
}
return assignmentChanged ? instances : null;
}