public void schedule()

in storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java [68:144]


    public void schedule(Topologies topologies, Cluster cluster) {
        List<TopologyDetails> isoTopologies = isolatedTopologies(topologies.getTopologies());
        Set<String> isoIds = extractTopologyIds(isoTopologies);
        Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = topologyWorkerSpecs(isoTopologies);
        Map<String, Map<Integer, Integer>> topologyMachineDistributions = topologyMachineDistributions(isoTopologies);
        Map<String, List<AssignmentInfo>> hostAssignments = hostAssignments(cluster);

        for (Map.Entry<String, List<AssignmentInfo>> entry : hostAssignments.entrySet()) {
            List<AssignmentInfo> assignments = entry.getValue();
            String topologyId = assignments.get(0).getTopologyId();
            Map<Integer, Integer> distribution = topologyMachineDistributions.get(topologyId);
            Set<Set<ExecutorDetails>> workerSpecs = topologyWorkerSpecs.get(topologyId);
            int numWorkers = assignments.size();

            if (isoIds.contains(topologyId)
                && checkAssignmentTopology(assignments, topologyId)
                && distribution.containsKey(numWorkers)
                && checkAssignmentWorkerSpecs(assignments, workerSpecs)) {
                decrementDistribution(distribution, numWorkers);
                for (AssignmentInfo ass : assignments) {
                    workerSpecs.remove(ass.getExecutors());
                }
                cluster.blacklistHost(entry.getKey());
            } else {
                for (AssignmentInfo ass : assignments) {
                    if (isoIds.contains(ass.getTopologyId())) {
                        cluster.freeSlot(ass.getWorkerSlot());
                    }
                }
            }
        }

        Map<String, Set<WorkerSlot>> hostUsedSlots = hostToUsedSlots(cluster);
        LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster);
        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : topologyWorkerSpecs.entrySet()) {
            String topologyId = entry.getKey();
            Set<Set<ExecutorDetails>> executorSet = entry.getValue();
            List<Integer> workerNum = distributionToSortedAmounts(topologyMachineDistributions.get(topologyId));
            for (Integer num : workerNum) {
                HostAssignableSlots hostSlots = hss.peek();
                List<WorkerSlot> slot = hostSlots != null ? hostSlots.getWorkerSlots() : null;

                if (slot != null && slot.size() >= num) {
                    hss.poll();
                    cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName()));
                    for (WorkerSlot tmpSlot : slot.subList(0, num)) {
                        Set<ExecutorDetails> executor = removeElemFromExecutorsSet(executorSet);
                        cluster.assign(tmpSlot, topologyId, executor);
                    }
                    cluster.blacklistHost(hostSlots.getHostName());
                }
            }
        }

        List<String> failedTopologyIds = extractFailedTopologyIds(topologyWorkerSpecs);
        if (failedTopologyIds.size() > 0) {
            LOG.warn("Unable to isolate topologies " + failedTopologyIds
                     + ". No machine had enough worker slots to run the remaining workers for these topologies. "
                     + "Clearing all other resources and will wait for enough resources for "
                     + "isolated topologies before allocating any other resources.");
            // clear workers off all hosts that are not blacklisted
            Map<String, Set<WorkerSlot>> usedSlots = hostToUsedSlots(cluster);
            Set<Map.Entry<String, Set<WorkerSlot>>> entries = usedSlots.entrySet();
            for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
                if (!cluster.isBlacklistedHost(entry.getKey())) {
                    cluster.freeSlots(entry.getValue());
                }
            }
        } else {
            // run default scheduler on non-isolated topologies
            Set<String> allocatedTopologies = allocatedTopologies(topologyWorkerSpecs);
            Topologies leftOverTopologies = leftoverTopologies(topologies, allocatedTopologies);
            DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
        }
        Set<String> origBlacklist = cluster.getBlacklistedHosts();
        cluster.setBlacklistedHosts(origBlacklist);
    }