in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java [426:547]
protected SchedulingResult scheduleExecutorsOnNodes(List<ExecutorDetails> orderedExecutors, Iterable<String> sortedNodesIter) {
long startTimeMilli = Time.currentTimeMillis();
searcherState.setSortedExecs(orderedExecutors);
int maxExecCnt = searcherState.getExecSize();
// following three are state information at each "execIndex" level
int progressIdx = -1;
int[] progressIdxForExec = new int[maxExecCnt];
RasNode[] nodeForExec = new RasNode[maxExecCnt];
WorkerSlot[] workerSlotForExec = new WorkerSlot[maxExecCnt];
for (int i = 0; i < maxExecCnt ; i++) {
progressIdxForExec[i] = -1;
}
LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}, sortNodesForEachExecutor={}",
maxExecCnt, topoName, sortNodesForEachExecutor);
OUTERMOST_LOOP:
for (int loopCnt = 0 ; true ; loopCnt++) {
LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}", loopCnt, searcherState.getExecIndex(), topoName);
if (searcherState.areSearchLimitsExceeded()) {
LOG.warn("Limits exceeded, backtrackCnt={}, loopCnt={}, topo={}", searcherState.getNumBacktrack(), loopCnt, topoName);
return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
}
if (Thread.currentThread().isInterrupted()) {
return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
}
int execIndex = searcherState.getExecIndex();
ExecutorDetails exec = searcherState.currentExec();
// If current exec is found in searcherState assigned Ackers,
// it means it has been assigned as a bound acker already.
// So we skip to the next.
if (searcherState.getBoundAckers().contains(exec)) {
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
loopCnt, Time.currentTimeMillis() - startTimeMilli,
Time.currentTimeMillis() - searcherState.startTimeMillis,
searcherState.getNumBacktrack(),
topoName);
return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
}
searcherState = searcherState.nextExecutor();
continue OUTERMOST_LOOP;
}
String comp = execToComp.get(exec);
if (sortedNodesIter == null || (this.sortNodesForEachExecutor && searcherState.isExecCompDifferentFromPrior())) {
progressIdx = -1;
nodeSorter.prepare(exec);
sortedNodesIter = nodeSorter.sortAllNodes();
}
for (String nodeId : sortedNodesIter) {
RasNode node = nodes.getNodeById(nodeId);
if (!node.couldEverFit(exec, topologyDetails)) {
continue;
}
for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
progressIdx++;
if (progressIdx <= progressIdxForExec[execIndex]) {
continue;
}
progressIdxForExec[execIndex]++;
if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
// exec can't fit in this workerSlot, try next workerSlot
LOG.trace("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
exec, comp, topoName, workerSlot,
node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
continue;
}
searcherState.incStatesSearched();
searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
int numBoundAckerAssigned = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
if (numBoundAckerAssigned > 0) {
// This exec with some of its bounded ackers have all been successfully assigned
searcherState.getExecsWithBoundAckers().add(exec);
}
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
loopCnt, Time.currentTimeMillis() - startTimeMilli,
Time.currentTimeMillis() - searcherState.startTimeMillis,
searcherState.getNumBacktrack(),
topoName);
return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
}
searcherState = searcherState.nextExecutor();
nodeForExec[execIndex] = node;
workerSlotForExec[execIndex] = workerSlot;
LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, "
+ "slot-ordinal={} at loopCnt={}, topo={}",
execIndex, comp, nodeId, node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
progressIdx, loopCnt, topoName);
continue OUTERMOST_LOOP;
}
}
sortedNodesIter = null;
// if here, then the executor was not assigned, backtrack;
LOG.debug("scheduleExecutorsOnNodes: Failed to schedule execId={}, comp={} at loopCnt={}, topo={}",
execIndex, comp, loopCnt, topoName);
if (execIndex == 0) {
break;
} else {
searcherState.backtrack(execToComp, nodeForExec, workerSlotForExec);
progressIdxForExec[execIndex] = -1;
}
}
boolean success = searcherState.areAllExecsScheduled();
LOG.info("scheduleExecutorsOnNodes: Scheduled={} in {} milliseconds, state.elapsedtime={}, backtrackCnt={}, topo={}",
success, Time.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - searcherState.startTimeMillis,
searcherState.getNumBacktrack(),
topoName);
return searcherState.createSchedulingResult(success, this.getClass().getSimpleName());
}