in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [843:940]
private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments,
Map<String, Assignment> newAssignments) {
boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty();
long numRemovedExec = 0;
long numRemovedSlot = 0;
long numAddedExec = 0;
long numAddedSlot = 0;
if (existingAssignments.isEmpty()) {
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
final long count = new HashSet<>(execToPort.values()).size();
LOG.info("Assigning {} to {} slots", entry.getKey(), count);
LOG.info("Assign executors: {}", execToPort.keySet());
numAddedSlot += count;
numAddedExec += execToPort.size();
}
} else if (newAssignments.isEmpty()) {
for (Entry<String, Assignment> entry : existingAssignments.entrySet()) {
final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
final long count = new HashSet<>(execToPort.values()).size();
LOG.info("Removing {} from {} slots", entry.getKey(), count);
LOG.info("Remove executors: {}", execToPort.keySet());
numRemovedSlot += count;
numRemovedExec += execToPort.size();
}
} else {
MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments);
if (anyChanged = !difference.areEqual()) {
for (Entry<String, Assignment> entry : difference.entriesOnlyOnLeft().entrySet()) {
final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
final long count = new HashSet<>(execToPort.values()).size();
LOG.info("Removing {} from {} slots", entry.getKey(), count);
LOG.info("Remove executors: {}", execToPort.keySet());
numRemovedSlot += count;
numRemovedExec += execToPort.size();
}
for (Entry<String, Assignment> entry : difference.entriesOnlyOnRight().entrySet()) {
final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
final long count = new HashSet<>(execToPort.values()).size();
LOG.info("Assigning {} to {} slots", entry.getKey(), count);
LOG.info("Assign executors: {}", execToPort.keySet());
numAddedSlot += count;
numAddedExec += execToPort.size();
}
for (Entry<String, MapDifference.ValueDifference<Assignment>> entry : difference.entriesDiffering().entrySet()) {
final Map<List<Long>, NodeInfo> execToSlot = entry.getValue().rightValue().get_executor_node_port();
final Set<NodeInfo> slots = new HashSet<>(execToSlot.values());
LOG.info("Reassigning {} to {} slots", entry.getKey(), slots.size());
LOG.info("Reassign executors: {}", execToSlot.keySet());
final Map<List<Long>, NodeInfo> oldExecToSlot = entry.getValue().leftValue().get_executor_node_port();
long commonExecCount = 0;
Set<NodeInfo> commonSlots = new HashSet<>(execToSlot.size());
for (Entry<List<Long>, NodeInfo> execEntry : execToSlot.entrySet()) {
if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) {
commonExecCount++;
commonSlots.add(execEntry.getValue());
}
}
long commonSlotCount = commonSlots.size();
//Treat reassign as remove and add
numRemovedSlot += new HashSet<>(oldExecToSlot.values()).size() - commonSlotCount;
numRemovedExec += oldExecToSlot.size() - commonExecCount;
numAddedSlot += slots.size() - commonSlotCount;
numAddedExec += execToSlot.size() - commonExecCount;
}
}
LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet());
}
numAddedExecPerScheduling.update(numAddedExec);
numAddedSlotPerScheduling.update(numAddedSlot);
numRemovedExecPerScheduling.update(numRemovedExec);
numRemovedSlotPerScheduling.update(numRemovedSlot);
numNetExecIncreasePerScheduling.update(numAddedExec - numRemovedExec);
numNetSlotIncreasePerScheduling.update(numAddedSlot - numRemovedSlot);
if (anyChanged) {
LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
nodeIdToResources.get().forEach((id, node) -> {
final double availableMem = node.getAvailableMem();
if (availableMem < 0) {
LOG.warn("Memory over-scheduled on {}", id, availableMem);
}
final double availableCpu = node.getAvailableCpu();
if (availableCpu < 0) {
LOG.warn("CPU over-scheduled on {}", id, availableCpu);
}
LOG.info(
"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
+ "CPU: {}, Available CPU: {}, fragmented: {}",
id, node.getTotalMem(), node.getUsedMem(), availableMem,
node.getTotalCpu(), node.getUsedCpu(), availableCpu, isFragmented(node));
});
}
return anyChanged;
}