private boolean auditAssignmentChanges()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [843:940]


    private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments,
                                           Map<String, Assignment> newAssignments) {
        boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty();
        long numRemovedExec = 0;
        long numRemovedSlot = 0;
        long numAddedExec   = 0;
        long numAddedSlot   = 0;
        if (existingAssignments.isEmpty()) {
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
                final long count = new HashSet<>(execToPort.values()).size();
                LOG.info("Assigning {} to {} slots", entry.getKey(), count);
                LOG.info("Assign executors: {}", execToPort.keySet());
                numAddedSlot += count;
                numAddedExec += execToPort.size();
            }
        } else if (newAssignments.isEmpty()) {
            for (Entry<String, Assignment> entry : existingAssignments.entrySet()) {
                final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
                final long count = new HashSet<>(execToPort.values()).size();
                LOG.info("Removing {} from {} slots", entry.getKey(), count);
                LOG.info("Remove executors: {}", execToPort.keySet());
                numRemovedSlot += count;
                numRemovedExec += execToPort.size();
            }
        } else {
            MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments);
            if (anyChanged = !difference.areEqual()) {
                for (Entry<String, Assignment> entry : difference.entriesOnlyOnLeft().entrySet()) {
                    final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
                    final long count = new HashSet<>(execToPort.values()).size();
                    LOG.info("Removing {} from {} slots", entry.getKey(), count);
                    LOG.info("Remove executors: {}", execToPort.keySet());
                    numRemovedSlot += count;
                    numRemovedExec += execToPort.size();
                }
                for (Entry<String, Assignment> entry : difference.entriesOnlyOnRight().entrySet()) {
                    final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
                    final long count = new HashSet<>(execToPort.values()).size();
                    LOG.info("Assigning {} to {} slots", entry.getKey(), count);
                    LOG.info("Assign executors: {}", execToPort.keySet());
                    numAddedSlot += count;
                    numAddedExec += execToPort.size();
                }
                for (Entry<String, MapDifference.ValueDifference<Assignment>> entry : difference.entriesDiffering().entrySet()) {
                    final Map<List<Long>, NodeInfo> execToSlot = entry.getValue().rightValue().get_executor_node_port();
                    final Set<NodeInfo> slots = new HashSet<>(execToSlot.values());
                    LOG.info("Reassigning {} to {} slots", entry.getKey(), slots.size());
                    LOG.info("Reassign executors: {}", execToSlot.keySet());

                    final Map<List<Long>, NodeInfo> oldExecToSlot = entry.getValue().leftValue().get_executor_node_port();

                    long commonExecCount = 0;
                    Set<NodeInfo> commonSlots = new HashSet<>(execToSlot.size());
                    for (Entry<List<Long>, NodeInfo> execEntry : execToSlot.entrySet()) {
                        if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) {
                            commonExecCount++;
                            commonSlots.add(execEntry.getValue());
                        }
                    }
                    long commonSlotCount = commonSlots.size();

                    //Treat reassign as remove and add
                    numRemovedSlot += new HashSet<>(oldExecToSlot.values()).size() - commonSlotCount;
                    numRemovedExec += oldExecToSlot.size() - commonExecCount;
                    numAddedSlot += slots.size() - commonSlotCount;
                    numAddedExec += execToSlot.size() - commonExecCount;
                }
            }
            LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet());
        }
        numAddedExecPerScheduling.update(numAddedExec);
        numAddedSlotPerScheduling.update(numAddedSlot);
        numRemovedExecPerScheduling.update(numRemovedExec);
        numRemovedSlotPerScheduling.update(numRemovedSlot);
        numNetExecIncreasePerScheduling.update(numAddedExec - numRemovedExec);
        numNetSlotIncreasePerScheduling.update(numAddedSlot - numRemovedSlot);

        if (anyChanged) {
            LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
            nodeIdToResources.get().forEach((id, node) -> {
                final double availableMem = node.getAvailableMem();
                if (availableMem < 0) {
                    LOG.warn("Memory over-scheduled on {}", id, availableMem);
                }
                final double availableCpu = node.getAvailableCpu();
                if (availableCpu < 0) {
                    LOG.warn("CPU over-scheduled on {}", id, availableCpu);
                }
                LOG.info(
                    "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
                        + "CPU: {}, Available CPU: {}, fragmented: {}",
                        id, node.getTotalMem(), node.getUsedMem(), availableMem,
                        node.getTotalCpu(), node.getUsedCpu(), availableCpu, isFragmented(node));
            });
        }
        return anyChanged;
    }