in storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java [128:219]
public void scheduleAsNeeded(NodePool... lesserPools) {
for (TopologyDetails td : tds.values()) {
String topId = td.getId();
if (cluster.needsScheduling(td)) {
LOG.debug("Scheduling topology {}", topId);
int totalTasks = td.getExecutors().size();
int origRequest = td.getNumWorkers();
int slotsRequested = Math.min(totalTasks, origRequest);
int slotsUsed = Node.countSlotsUsed(topId, nodes);
int slotsFree = Node.countFreeSlotsAlive(nodes);
//Check to see if we have enough slots before trying to get them
int slotsAvailable = 0;
if (slotsRequested > slotsFree) {
slotsAvailable = NodePool.slotsAvailable(lesserPools);
}
int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
int executorsNotRunning = cluster.getUnassignedExecutors(td).size();
LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}",
slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning);
if (slotsToUse <= 0) {
if (executorsNotRunning > 0) {
cluster.setStatus(topId, "Not fully scheduled (No free slots in default pool) "
+ executorsNotRunning
+ " executors not scheduled");
} else {
if (slotsUsed < slotsRequested) {
cluster.setStatus(topId, "Running with fewer slots than requested ("
+ slotsUsed + "/"
+ origRequest + ")");
} else { //slotsUsed < origRequest
cluster.setStatus(topId,
"Fully Scheduled (requested " + origRequest
+ " slots, but could only use " + slotsUsed + ")");
}
}
continue;
}
int slotsNeeded = slotsToUse - slotsFree;
if (slotsNeeded > 0) {
nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools));
}
if (executorsNotRunning <= 0) {
//There are free slots that we can take advantage of now.
for (Node n : nodes) {
n.freeTopology(topId, cluster);
}
slotsFree = Node.countFreeSlotsAlive(nodes);
slotsToUse = Math.min(slotsRequested, slotsFree);
}
RoundRobinSlotScheduler slotSched =
new RoundRobinSlotScheduler(td, slotsToUse, cluster);
LinkedList<Node> nodes = new LinkedList<>(this.nodes);
while (true) {
Node n;
do {
if (nodes.isEmpty()) {
throw new IllegalStateException("This should not happen, we"
+ " messed up and did not get enough slots");
}
n = nodes.peekFirst();
if (n.totalSlotsFree() == 0) {
nodes.remove();
n = null;
}
} while (n == null);
if (!slotSched.assignSlotTo(n)) {
break;
}
}
int afterSchedSlotsUsed = Node.countSlotsUsed(topId, this.nodes);
if (afterSchedSlotsUsed < slotsRequested) {
cluster.setStatus(topId, "Running with fewer slots than requested ("
+ afterSchedSlotsUsed + "/" + origRequest + ")");
} else if (afterSchedSlotsUsed < origRequest) {
cluster.setStatus(topId,
"Fully Scheduled (requested "
+ origRequest
+ " slots, but could only use "
+ afterSchedSlotsUsed
+ ")");
} else {
cluster.setStatus(topId, "Fully Scheduled");
}
} else {
cluster.setStatus(topId, "Fully Scheduled");
}
}
}