public void scheduleAsNeeded()

in storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java [128:219]


    public void scheduleAsNeeded(NodePool... lesserPools) {
        for (TopologyDetails td : tds.values()) {
            String topId = td.getId();
            if (cluster.needsScheduling(td)) {
                LOG.debug("Scheduling topology {}", topId);
                int totalTasks = td.getExecutors().size();
                int origRequest = td.getNumWorkers();
                int slotsRequested = Math.min(totalTasks, origRequest);
                int slotsUsed = Node.countSlotsUsed(topId, nodes);
                int slotsFree = Node.countFreeSlotsAlive(nodes);
                //Check to see if we have enough slots before trying to get them
                int slotsAvailable = 0;
                if (slotsRequested > slotsFree) {
                    slotsAvailable = NodePool.slotsAvailable(lesserPools);
                }
                int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
                int executorsNotRunning = cluster.getUnassignedExecutors(td).size();
                LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}",
                          slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning);
                if (slotsToUse <= 0) {
                    if (executorsNotRunning > 0) {
                        cluster.setStatus(topId, "Not fully scheduled (No free slots in default pool) "
                                + executorsNotRunning
                                + " executors not scheduled");
                    } else {
                        if (slotsUsed < slotsRequested) {
                            cluster.setStatus(topId, "Running with fewer slots than requested ("
                                    + slotsUsed + "/"
                                    + origRequest + ")");
                        } else { //slotsUsed < origRequest
                            cluster.setStatus(topId,
                                    "Fully Scheduled (requested " + origRequest
                                            + " slots, but could only use " + slotsUsed + ")");
                        }
                    }
                    continue;
                }

                int slotsNeeded = slotsToUse - slotsFree;
                if (slotsNeeded > 0) {
                    nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools));
                }

                if (executorsNotRunning <= 0) {
                    //There are free slots that we can take advantage of now.
                    for (Node n : nodes) {
                        n.freeTopology(topId, cluster);
                    }
                    slotsFree = Node.countFreeSlotsAlive(nodes);
                    slotsToUse = Math.min(slotsRequested, slotsFree);
                }

                RoundRobinSlotScheduler slotSched =
                    new RoundRobinSlotScheduler(td, slotsToUse, cluster);

                LinkedList<Node> nodes = new LinkedList<>(this.nodes);
                while (true) {
                    Node n;
                    do {
                        if (nodes.isEmpty()) {
                            throw new IllegalStateException("This should not happen, we"
                                    + " messed up and did not get enough slots");
                        }
                        n = nodes.peekFirst();
                        if (n.totalSlotsFree() == 0) {
                            nodes.remove();
                            n = null;
                        }
                    } while (n == null);
                    if (!slotSched.assignSlotTo(n)) {
                        break;
                    }
                }
                int afterSchedSlotsUsed = Node.countSlotsUsed(topId, this.nodes);
                if (afterSchedSlotsUsed < slotsRequested) {
                    cluster.setStatus(topId, "Running with fewer slots than requested ("
                            + afterSchedSlotsUsed + "/" + origRequest + ")");
                } else if (afterSchedSlotsUsed < origRequest) {
                    cluster.setStatus(topId,
                            "Fully Scheduled (requested "
                                    + origRequest
                                    + " slots, but could only use "
                                    + afterSchedSlotsUsed
                                    + ")");
                } else {
                    cluster.setStatus(topId, "Fully Scheduled");
                }
            } else {
                cluster.setStatus(topId, "Fully Scheduled");
            }
        }
    }