private void scheduleTopology()

in storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java [138:267]


    private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
                                  List<TopologyDetails> orderedTopologies, Map<String, Set<String>> tmpEvictedTopologiesMap) {
        //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
        Cluster workingState = new Cluster(cluster);
        RasNodes nodes = new RasNodes(workingState);
        IStrategy rasStrategy = null;
        String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
        try {
            String strategy = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
            if (strategy.startsWith("backtype.storm")) {
                // Storm support to launch workers of older version.
                // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name.
                strategy = strategy.replace("backtype.storm", "org.apache.storm");
                LOG.debug("Replaced backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
            }
            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
            rasStrategy.prepare(conf);
        } catch (DisallowedStrategyException e) {
            markFailedTopology(topologySubmitter, cluster, td,
                               "Unsuccessful in scheduling - " + e.getAttemptedClass()
                               + " is not an allowed strategy. Please make sure your "
                               + Config.TOPOLOGY_SCHEDULER_STRATEGY
                               + " config is one of the allowed strategies: "
                               + e.getAllowedStrategies(), e);
            return;
        } catch (RuntimeException e) {
            markFailedTopology(topologySubmitter, cluster, td,
                               "Unsuccessful in scheduling - failed to create instance of topology strategy "
                               + strategyConf
                               + ". Please check logs for details", e);
            return;
        }

        // Log warning here to avoid duplicating / spamming in strategy / scheduling code.
        boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
        boolean oneComponentPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
        if (oneExecutorPerWorker && oneComponentPerWorker) {
            LOG.warn("Conflicting options: {} and {} are both set! Ignoring {} option.",
                Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER,
                Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
        }

        TopologySchedulingResources topologySchedulingResources = new TopologySchedulingResources(workingState, td);
        final IStrategy finalRasStrategy = rasStrategy;
        for (int i = 0; i < maxSchedulingAttempts; i++) {
            SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
            try {
                SchedulingResult result = null;
                topologySchedulingResources.resetRemaining();
                if (topologySchedulingResources.canSchedule()) {
                    Future<SchedulingResult> schedulingFuture = backgroundScheduling.submit(
                        () -> finalRasStrategy.schedule(toSchedule, td));
                    try {
                        result = schedulingFuture.get(schedulingTimeoutSeconds, TimeUnit.SECONDS);
                    } catch (TimeoutException te) {
                        markFailedTopology(topologySubmitter, cluster, td, "Scheduling took too long for "
                                + td.getId() + " using strategy " + rasStrategy.getClass().getName() + " timeout after "
                                + schedulingTimeoutSeconds + " seconds using config "
                                + DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY + ".");
                        schedulingTimeoutMeter.mark();
                        schedulingFuture.cancel(true);
                        return;
                    }
                } else {
                    result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "");
                }
                LOG.debug("scheduling result: {}", result);
                if (result == null) {
                    markFailedTopology(topologySubmitter, cluster, td, "Internal scheduler error");
                    return;
                } else {
                    if (result.isSuccess()) {
                        cluster.updateFrom(toSchedule);
                        cluster.setStatus(td.getId(), "Running - " + result.getMessage());
                        //DONE
                        return;
                    } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                        LOG.debug("Not enough resources to schedule {}", td.getName());
                        List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                        LOG.debug("Attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
                        int tdIndex = reversedList.indexOf(td);
                        topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);

                        Set<String> tmpEvictedTopos = new HashSet<>();
                        for (int index = 0; index < tdIndex; index++) {
                            TopologyDetails topologyEvict = reversedList.get(index);
                            SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                            if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) {
                                topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
                                tmpEvictedTopos.add(topologyEvict.getId());
                                Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                                nodes.freeSlots(workersToEvict);
                                if (topologySchedulingResources.canSchedule()) {
                                    //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more
                                    // than is needed
                                    break;
                                }
                            }
                        }
                        if (!tmpEvictedTopos.isEmpty()) {
                            LOG.warn("Evicted Topologies {} when scheduling topology: {}", tmpEvictedTopos, td.getId());
                            tmpEvictedTopologiesMap.computeIfAbsent(td.getId(), k -> new HashSet<>()).addAll(tmpEvictedTopos);
                        } else {
                            StringBuilder message = new StringBuilder();
                            message.append("Not enough resources to schedule after evicting lower priority topologies. ");
                            message.append(topologySchedulingResources.getRemainingRequiredResourcesMessage());
                            message.append(result.getErrorMessage());
                            markFailedTopology(topologySubmitter, cluster, td, message.toString());
                            return;
                        }
                        //Only place we fall though to do the loop over again...
                    } else { //Any other failure result
                        topologySubmitter.markTopoUnsuccess(td, cluster, result.toString());
                        return;
                    }
                }
            } catch (Exception ex) {
                internalErrorMeter.mark();
                markFailedTopology(topologySubmitter, cluster, td,
                        "Internal Error - Exception thrown when scheduling. Please check logs for details", ex);
                return;
            }
        }
        // We can only reach here when we failed to free enough space by evicting current topologies after {maxSchedulingAttempts}
        // while that scheduler did evict something at each attempt.
        markFailedTopology(topologySubmitter, cluster, td,
            "Failed to make enough resources for " + td.getId()
                    + " by evicting lower priority topologies within " + maxSchedulingAttempts + " attempts. "
                    + topologySchedulingResources.getRemainingRequiredResourcesMessage());
    }