public static boolean validateSolution()

in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java [129:247]


    public static boolean validateSolution(Cluster cluster, TopologyDetails topo) {
        LOG.debug("Checking for a valid scheduling for topology {}...", topo.getName());
        if (cluster.getAssignmentById(topo.getId()) == null) {
            String err = "cluster.getAssignmentById(\"" + topo.getId() + "\") returned null";
            LOG.error(err);
            throw new AssertionError("No assignments for topologyId " + topo.getId());
        }

        ConstraintSolverConfig constraintSolverConfig = new ConstraintSolverConfig(topo);

        // First check NodeCoLocationCnt constraints
        Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
        Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); // this is the critical count
        Map<WorkerSlot, RasNode> workerToNodes = new HashMap<>();
        RasNodes.getAllNodesFrom(cluster)
                .values()
                .forEach(node -> node.getUsedSlots().forEach(workerSlot -> workerToNodes.put(workerSlot, node)));

        List<String> errors = new ArrayList<>();

        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
            ExecutorDetails exec = entry.getKey();
            String comp = execToComp.get(exec);
            WorkerSlot worker = entry.getValue();
            RasNode node = workerToNodes.get(worker);
            String nodeId = node.getId();

            if (!constraintSolverConfig.getMaxNodeCoLocationCnts().containsKey(comp)) {
                continue;
            }
            int allowedColocationMaxCnt = constraintSolverConfig.getMaxNodeCoLocationCnts().get(comp);
            Map<String, Integer> oneNodeCompMap = nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>());
            oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0) + 1);
            if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) {
                String err = String.format("MaxNodeCoLocation: Component %s (exec=%s) on node %s, cnt %d > allowed %d",
                        comp, exec, nodeId, oneNodeCompMap.get(comp), allowedColocationMaxCnt);
                errors.add(err);
            }
        }

        // Second check IncompatibileComponent Constraints
        Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
        cluster.getAssignmentById(topo.getId()).getExecutorToSlot()
                .forEach((exec, worker) -> {
                    String comp = execToComp.get(exec);
                    workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp);
                });
        for (Map.Entry<WorkerSlot, Set<String>> entry : workerCompMap.entrySet()) {
            Set<String> comps = entry.getValue();
            for (String comp1 : comps) {
                for (String comp2 : comps) {
                    if (!comp1.equals(comp2)
                            && constraintSolverConfig.getIncompatibleComponentSets().containsKey(comp1)
                            && constraintSolverConfig.getIncompatibleComponentSets().get(comp1).contains(comp2)) {
                        String err = String.format("IncompatibleComponents: %s and %s on WorkerSlot: %s",
                                comp1, comp2, entry.getKey());
                        errors.add(err);
                    }
                }
            }
        }

        // Third check resources
        SchedulerAssignment schedulerAssignment = cluster.getAssignmentById(topo.getId());
        Map<ExecutorDetails, WorkerSlot> execToWorker = new HashMap<>();
        if (schedulerAssignment.getExecutorToSlot() != null) {
            execToWorker.putAll(schedulerAssignment.getExecutorToSlot());
        }

        Map<String, RasNode> nodes = RasNodes.getAllNodesFrom(cluster);
        Map<RasNode, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : execToWorker.entrySet()) {
            ExecutorDetails exec = entry.getKey();
            WorkerSlot worker = entry.getValue();
            RasNode node = nodes.get(worker.getNodeId());

            if (node.getAvailableMemoryResources() < 0.0) {
                String err = String.format("Resource Exhausted: Found node %s with negative available memory %,.2f",
                        node.getId(), node.getAvailableMemoryResources());
                errors.add(err);
                continue;
            }
            if (node.getAvailableCpuResources() < 0.0) {
                String err = String.format("Resource Exhausted: Found node %s with negative available CPU %,.2f",
                        node.getId(), node.getAvailableCpuResources());
                errors.add(err);
                continue;
            }
            nodeToExecs.computeIfAbsent(node, (k) -> new HashSet<>()).add(exec);
        }

        for (Map.Entry<RasNode, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
            RasNode node = entry.getKey();
            Collection<ExecutorDetails> execs = entry.getValue();
            double cpuUsed = 0.0;
            double memoryUsed = 0.0;
            for (ExecutorDetails exec : execs) {
                cpuUsed += topo.getTotalCpuReqTask(exec);
                memoryUsed += topo.getTotalMemReqTask(exec);
            }
            if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
                String err = String.format("Incorrect CPU Resources: Node %s CPU available is %,.2f, expected %,.2f, "
                                + "Executors scheduled on node: %s",
                        node.getId(), node.getAvailableCpuResources(), (node.getTotalCpuResources() - cpuUsed), execs);
                errors.add(err);
            }
            if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
                String err = String.format("Incorrect Memory Resources: Node %s Memory available is %,.2f, expected %,.2f, "
                                + "Executors scheduled on node: %s",
                        node.getId(), node.getAvailableMemoryResources(), (node.getTotalMemoryResources() - memoryUsed), execs);
                errors.add(err);
            }
        }

        if (!errors.isEmpty()) {
            LOG.error("Topology {} solution is invalid\n\t{}", topo.getName(), String.join("\n\t", errors));
        }
        return errors.isEmpty();
    }