in stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java [72:192]
public ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata,
ClusterAssignmentData currentState,
Set<BookieId> currentCluster) {
if (currentCluster.isEmpty()) {
log.info("Current cluster is empty. No alive server is found.");
return currentState;
}
// 1. get current server assignments
Map<BookieId, Set<Long>> currentServerAssignments;
try {
currentServerAssignments = currentState.getServersMap()
.entrySet()
.stream()
.collect(Collectors.toMap(e1 -> {
return BookieId.parse(e1.getKey());
},
e2 -> e2.getValue().getContainersList().stream().collect(Collectors.toSet())
));
} catch (UncheckedExecutionException uee) {
log.warn("Invalid cluster assignment data is found : {} - {}. Recompute assignment from empty state",
currentState, uee.getCause().getMessage());
currentServerAssignments = Maps.newHashMap();
}
Set<BookieId> currentServersAssigned = currentServerAssignments.keySet();
// 2. if no servers is assigned, initialize the ideal state
if (currentServersAssigned.isEmpty()) {
return initializeIdealState(clusterMetadata, currentCluster);
}
// 3. get the cluster diffs
Set<BookieId> serversAdded =
Sets.difference(currentCluster, currentServersAssigned).immutableCopy();
Set<BookieId> serversRemoved =
Sets.difference(currentServersAssigned, currentCluster).immutableCopy();
if (serversAdded.isEmpty() && serversRemoved.isEmpty()) {
// cluster is unchanged, assuming the current state is ideal, no re-assignment is required.
return currentState;
}
log.info("Storage container controller detects cluster changed:\n"
+ "\t {} servers added: {}\n\t {} servers removed: {}",
serversAdded.size(), serversAdded, serversRemoved.size(), serversRemoved);
// 4. compute the containers that owned by servers removed. these containers are needed to be reassigned.
Set<Long> containersToReassign = currentServerAssignments.entrySet().stream()
.filter(serverEntry -> !currentCluster.contains(serverEntry.getKey()))
.flatMap(serverEntry -> serverEntry.getValue().stream())
.collect(Collectors.toSet());
// 5. use an ordered set as priority deque to sort the servers by the number of assigned containers
TreeSet<Pair<BookieId, LinkedList<Long>>> assignmentQueue =
new TreeSet<>(new ServerAssignmentDataComparator());
for (Map.Entry<BookieId, Set<Long>> entry : currentServerAssignments.entrySet()) {
BookieId host = entry.getKey();
if (!currentCluster.contains(host)) {
if (log.isTraceEnabled()) {
log.trace("Host {} is not in current cluster anymore", host);
}
continue;
} else {
if (log.isTraceEnabled()) {
log.trace("Adding host {} to assignment queue", host);
}
assignmentQueue.add(Pair.of(host, Lists.newLinkedList(entry.getValue())));
}
}
// 6. add new servers
for (BookieId server : serversAdded) {
assignmentQueue.add(Pair.of(server, Lists.newLinkedList()));
}
// 7. assign the containers that are needed to be reassigned.
for (Long containerId : containersToReassign) {
Pair<BookieId, LinkedList<Long>> leastLoadedServer = assignmentQueue.pollFirst();
leastLoadedServer.getValue().add(containerId);
assignmentQueue.add(leastLoadedServer);
}
// 8. rebalance the containers if needed
int diffAllowed;
if (assignmentQueue.size() > clusterMetadata.getNumStorageContainers()) {
diffAllowed = 1;
} else {
diffAllowed = clusterMetadata.getNumStorageContainers() % assignmentQueue.size() == 0 ? 0 : 1;
}
Pair<BookieId, LinkedList<Long>> leastLoaded = assignmentQueue.first();
Pair<BookieId, LinkedList<Long>> mostLoaded = assignmentQueue.last();
while (mostLoaded.getValue().size() - leastLoaded.getValue().size() > diffAllowed) {
leastLoaded = assignmentQueue.pollFirst();
mostLoaded = assignmentQueue.pollLast();
// move container from mostLoaded to leastLoaded
Long containerId = mostLoaded.getValue().removeFirst();
// add the container to the end to avoid balancing this container again.
leastLoaded.getValue().addLast(containerId);
assignmentQueue.add(leastLoaded);
assignmentQueue.add(mostLoaded);
leastLoaded = assignmentQueue.first();
mostLoaded = assignmentQueue.last();
}
// 9. the new ideal state is computed, finalize it
Map<String, ServerAssignmentData> newAssignmentMap = Maps.newHashMap();
assignmentQueue.forEach(assignment -> newAssignmentMap.put(
assignment.getKey().toString(),
ServerAssignmentData.newBuilder()
.addAllContainers(assignment.getValue())
.build()));
return ClusterAssignmentData.newBuilder()
.putAllServers(newAssignmentMap)
.build();
}