in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/RoundRobinResourceAwareStrategy.java [87:186]
protected SchedulingResult scheduleExecutorsOnNodes(List<ExecutorDetails> orderedExecutors, Iterable<String> sortedNodesIterable) {
long startTimeMilli = Time.currentTimeMillis();
int maxExecCnt = searcherState.getExecSize();
int nodeSortCnt = 1;
Iterator<String> sortedNodesIter = null;
ArrayList<String> sortedNodes = getTruncatedNodeList(sortedNodesIterable);
LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}", maxExecCnt, topoName);
searcherState.setSortedExecs(orderedExecutors);
OUTERMOST_LOOP:
for (int loopCnt = 0 ; true ; loopCnt++) {
LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}, nodeSortCnt={}",
loopCnt, searcherState.getExecIndex(), topoName, nodeSortCnt);
if (searcherState.areSearchLimitsExceeded()) {
LOG.warn("Limits exceeded, loopCnt={}, topo={}, nodeSortCnt={}", loopCnt, topoName, nodeSortCnt);
return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
}
if (Thread.currentThread().isInterrupted()) {
return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
}
int execIndex = searcherState.getExecIndex();
ExecutorDetails exec = searcherState.currentExec();
// If current exec is found in searcherState assigned Ackers,
// it means it has been assigned as a bound acker already.
// So we skip to the next.
if (searcherState.getBoundAckers().contains(exec)) {
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, topo={}, nodeSortCnt={}",
loopCnt, Time.currentTimeMillis() - startTimeMilli,
Time.currentTimeMillis() - searcherState.getStartTimeMillis(),
topoName, nodeSortCnt);
return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
}
searcherState = searcherState.nextExecutor();
continue OUTERMOST_LOOP;
}
String comp = execToComp.get(exec);
// start at the beginning of node list when component changes or when at end of nodes
if (sortedNodesIter == null || searcherState.isExecCompDifferentFromPrior() || !sortedNodesIter.hasNext()) {
sortedNodesIter = sortedNodes.iterator();
nodeSortCnt++;
}
while (sortedNodesIter.hasNext()) {
String nodeId = sortedNodesIter.next();
RasNode node = nodes.getNodeById(nodeId);
if (!node.couldEverFit(exec, topologyDetails)) {
continue;
}
for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
// exec can't fit in this workerSlot, try next workerSlot
LOG.trace("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
exec, comp, topoName, workerSlot,
node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
continue;
}
searcherState.incStatesSearched();
searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
int numBoundAckerAssigned = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
if (numBoundAckerAssigned > 0) {
// This exec with some of its bounded ackers have all been successfully assigned
searcherState.getExecsWithBoundAckers().add(exec);
}
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, topo={}, nodeSortCnt={}",
loopCnt, Time.currentTimeMillis() - startTimeMilli,
Time.currentTimeMillis() - searcherState.getStartTimeMillis(),
topoName, nodeSortCnt);
return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
}
searcherState = searcherState.nextExecutor();
LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, "
+ "worker-port={} at loopCnt={}, topo={}, nodeSortCnt={}",
execIndex, comp, nodeId, node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
workerSlot.getPort(), loopCnt, topoName, nodeSortCnt);
continue OUTERMOST_LOOP;
}
}
// if here, then the executor was not assigned, scheduling failed
LOG.debug("scheduleExecutorsOnNodes: Failed to schedule execId={}, comp={} at loopCnt={}, topo={}, nodeSortCnt={}",
execIndex, comp, loopCnt, topoName, nodeSortCnt);
break;
}
boolean success = searcherState.areAllExecsScheduled();
LOG.info("scheduleExecutorsOnNodes: Scheduled={} in {} milliseconds, state.elapsedtime={}, topo={}, nodeSortCnt={}",
success, Time.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - searcherState.getStartTimeMillis(),
topoName, nodeSortCnt);
return searcherState.createSchedulingResult(success, this.getClass().getSimpleName());
}