protected SchedulingResult scheduleExecutorsOnNodes()

in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/RoundRobinResourceAwareStrategy.java [87:186]


    protected SchedulingResult scheduleExecutorsOnNodes(List<ExecutorDetails> orderedExecutors, Iterable<String> sortedNodesIterable) {
        long startTimeMilli = Time.currentTimeMillis();
        int  maxExecCnt     = searcherState.getExecSize();
        int  nodeSortCnt    = 1;
        Iterator<String> sortedNodesIter = null;
        ArrayList<String> sortedNodes = getTruncatedNodeList(sortedNodesIterable);

        LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}", maxExecCnt, topoName);

        searcherState.setSortedExecs(orderedExecutors);

        OUTERMOST_LOOP:
        for (int loopCnt = 0 ; true ; loopCnt++) {
            LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}, nodeSortCnt={}",
                    loopCnt, searcherState.getExecIndex(), topoName, nodeSortCnt);
            if (searcherState.areSearchLimitsExceeded()) {
                LOG.warn("Limits exceeded, loopCnt={}, topo={}, nodeSortCnt={}", loopCnt, topoName, nodeSortCnt);
                return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
            }

            if (Thread.currentThread().isInterrupted()) {
                return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
            }

            int execIndex = searcherState.getExecIndex();
            ExecutorDetails exec = searcherState.currentExec();

            // If current exec is found in searcherState assigned Ackers,
            // it means it has been assigned as a bound acker already.
            // So we skip to the next.
            if (searcherState.getBoundAckers().contains(exec)) {
                if (searcherState.areAllExecsScheduled()) {
                    //Everything is scheduled correctly, so no need to search any more.
                    LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, topo={}, nodeSortCnt={}",
                        loopCnt, Time.currentTimeMillis() - startTimeMilli,
                        Time.currentTimeMillis() - searcherState.getStartTimeMillis(),
                        topoName, nodeSortCnt);
                    return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
                }
                searcherState = searcherState.nextExecutor();
                continue OUTERMOST_LOOP;
            }

            String comp = execToComp.get(exec);
            // start at the beginning of node list when component changes or when at end of nodes
            if (sortedNodesIter == null || searcherState.isExecCompDifferentFromPrior() || !sortedNodesIter.hasNext()) {
                sortedNodesIter = sortedNodes.iterator();
                nodeSortCnt++;
            }

            while (sortedNodesIter.hasNext()) {
                String nodeId = sortedNodesIter.next();
                RasNode node = nodes.getNodeById(nodeId);
                if (!node.couldEverFit(exec, topologyDetails)) {
                    continue;
                }
                for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
                    if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
                        // exec can't fit in this workerSlot, try next workerSlot
                        LOG.trace("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
                            exec, comp, topoName, workerSlot,
                            node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
                        continue;
                    }

                    searcherState.incStatesSearched();
                    searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
                    int numBoundAckerAssigned = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
                    if (numBoundAckerAssigned > 0) {
                        // This exec with some of its bounded ackers have all been successfully assigned
                        searcherState.getExecsWithBoundAckers().add(exec);
                    }

                    if (searcherState.areAllExecsScheduled()) {
                        //Everything is scheduled correctly, so no need to search any more.
                        LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, topo={}, nodeSortCnt={}",
                                loopCnt, Time.currentTimeMillis() - startTimeMilli,
                                Time.currentTimeMillis() - searcherState.getStartTimeMillis(),
                                topoName, nodeSortCnt);
                        return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
                    }
                    searcherState = searcherState.nextExecutor();
                    LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, "
                            + "worker-port={} at loopCnt={}, topo={}, nodeSortCnt={}",
                        execIndex, comp, nodeId, node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
                        workerSlot.getPort(), loopCnt, topoName, nodeSortCnt);
                    continue OUTERMOST_LOOP;
                }
            }
            // if here, then the executor was not assigned, scheduling failed
            LOG.debug("scheduleExecutorsOnNodes: Failed to schedule execId={}, comp={} at loopCnt={}, topo={}, nodeSortCnt={}",
                    execIndex, comp, loopCnt, topoName, nodeSortCnt);
            break;
        }
        boolean success = searcherState.areAllExecsScheduled();
        LOG.info("scheduleExecutorsOnNodes: Scheduled={} in {} milliseconds, state.elapsedtime={}, topo={}, nodeSortCnt={}",
                success, Time.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - searcherState.getStartTimeMillis(),
                topoName, nodeSortCnt);
        return searcherState.createSchedulingResult(success, this.getClass().getSimpleName());
    }