protected SchedulingResult scheduleExecutorsOnNodes()

in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java [426:547]


    protected SchedulingResult scheduleExecutorsOnNodes(List<ExecutorDetails> orderedExecutors, Iterable<String> sortedNodesIter) {

        long         startTimeMilli     = Time.currentTimeMillis();
        searcherState.setSortedExecs(orderedExecutors);
        int          maxExecCnt         = searcherState.getExecSize();

        // following three are state information at each "execIndex" level
        int progressIdx = -1;
        int[]        progressIdxForExec = new int[maxExecCnt];
        RasNode[]    nodeForExec        = new RasNode[maxExecCnt];
        WorkerSlot[] workerSlotForExec  = new WorkerSlot[maxExecCnt];

        for (int i = 0; i < maxExecCnt ; i++) {
            progressIdxForExec[i] = -1;
        }
        LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}, sortNodesForEachExecutor={}",
                maxExecCnt, topoName, sortNodesForEachExecutor);

        OUTERMOST_LOOP:
        for (int loopCnt = 0 ; true ; loopCnt++) {
            LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}", loopCnt, searcherState.getExecIndex(), topoName);
            if (searcherState.areSearchLimitsExceeded()) {
                LOG.warn("Limits exceeded, backtrackCnt={}, loopCnt={}, topo={}", searcherState.getNumBacktrack(), loopCnt, topoName);
                return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
            }

            if (Thread.currentThread().isInterrupted()) {
                return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
            }

            int execIndex = searcherState.getExecIndex();
            ExecutorDetails exec = searcherState.currentExec();

            // If current exec is found in searcherState assigned Ackers,
            // it means it has been assigned as a bound acker already.
            // So we skip to the next.
            if (searcherState.getBoundAckers().contains(exec)) {
                if (searcherState.areAllExecsScheduled()) {
                    //Everything is scheduled correctly, so no need to search any more.
                    LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
                        loopCnt, Time.currentTimeMillis() - startTimeMilli,
                        Time.currentTimeMillis() - searcherState.startTimeMillis,
                        searcherState.getNumBacktrack(),
                        topoName);
                    return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
                }
                searcherState = searcherState.nextExecutor();
                continue OUTERMOST_LOOP;
            }

            String comp = execToComp.get(exec);
            if (sortedNodesIter == null || (this.sortNodesForEachExecutor && searcherState.isExecCompDifferentFromPrior())) {
                progressIdx = -1;
                nodeSorter.prepare(exec);
                sortedNodesIter = nodeSorter.sortAllNodes();
            }

            for (String nodeId : sortedNodesIter) {
                RasNode node = nodes.getNodeById(nodeId);
                if (!node.couldEverFit(exec, topologyDetails)) {
                    continue;
                }
                for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
                    progressIdx++;
                    if (progressIdx <= progressIdxForExec[execIndex]) {
                        continue;
                    }
                    progressIdxForExec[execIndex]++;

                    if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
                        // exec can't fit in this workerSlot, try next workerSlot
                        LOG.trace("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
                            exec, comp, topoName, workerSlot,
                            node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
                        continue;
                    }

                    searcherState.incStatesSearched();
                    searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
                    int numBoundAckerAssigned = assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
                    if (numBoundAckerAssigned > 0) {
                        // This exec with some of its bounded ackers have all been successfully assigned
                        searcherState.getExecsWithBoundAckers().add(exec);
                    }

                    if (searcherState.areAllExecsScheduled()) {
                        //Everything is scheduled correctly, so no need to search any more.
                        LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
                                loopCnt, Time.currentTimeMillis() - startTimeMilli,
                                Time.currentTimeMillis() - searcherState.startTimeMillis,
                                searcherState.getNumBacktrack(),
                                topoName);
                        return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
                    }
                    searcherState = searcherState.nextExecutor();
                    nodeForExec[execIndex] = node;
                    workerSlotForExec[execIndex] = workerSlot;
                    LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, "
                            + "slot-ordinal={} at loopCnt={}, topo={}",
                        execIndex, comp, nodeId, node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
                        progressIdx, loopCnt, topoName);
                    continue OUTERMOST_LOOP;
                }
            }
            sortedNodesIter = null;
            // if here, then the executor was not assigned, backtrack;
            LOG.debug("scheduleExecutorsOnNodes: Failed to schedule execId={}, comp={} at loopCnt={}, topo={}",
                    execIndex, comp, loopCnt, topoName);
            if (execIndex == 0) {
                break;
            } else {
                searcherState.backtrack(execToComp, nodeForExec, workerSlotForExec);
                progressIdxForExec[execIndex] = -1;
            }
        }
        boolean success = searcherState.areAllExecsScheduled();
        LOG.info("scheduleExecutorsOnNodes: Scheduled={} in {} milliseconds, state.elapsedtime={}, backtrackCnt={}, topo={}",
                success, Time.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - searcherState.startTimeMillis,
                searcherState.getNumBacktrack(),
                topoName);
        return searcherState.createSchedulingResult(success, this.getClass().getSimpleName());
    }