public List getOwnerResourceSummaries()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [4971:5091]


    public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) throws AuthorizationException, TException {
        try {
            getOwnerResourceSummariesCalls.mark();
            checkAuthorization(null, null, "getOwnerResourceSummaries");
            IStormClusterState state = stormClusterState;
            Map<String, Assignment> topoIdToAssignments = state.assignmentsInfo();
            Map<String, StormBase> topoIdToBases = state.topologyBases();
            Map<String, Number> clusterSchedulerConfig = scheduler.config();

            //put [owner-> StormBase-list] mapping to ownerToBasesMap
            //if this owner (the input parameter) is null, add all the owners with stormbase and guarantees
            //else, add only this owner (the input paramter) to the map
            Map<String, List<StormBase>> ownerToBasesMap = new HashMap<>();

            if (owner == null) {
                // add all the owners to the map
                for (StormBase base : topoIdToBases.values()) {
                    String baseOwner = base.get_owner();
                    if (!ownerToBasesMap.containsKey(baseOwner)) {
                        List<StormBase> stormbases = new ArrayList<>();
                        stormbases.add(base);
                        ownerToBasesMap.put(baseOwner, stormbases);
                    } else {
                        ownerToBasesMap.get(baseOwner).add(base);
                    }
                }
                //in addition, add all the owners with guarantees
                List<String> ownersWithGuarantees = new ArrayList<>(clusterSchedulerConfig.keySet());
                for (String ownerWithGuarantees : ownersWithGuarantees) {
                    if (!ownerToBasesMap.containsKey(ownerWithGuarantees)) {
                        ownerToBasesMap.put(ownerWithGuarantees, new ArrayList<>());
                    }
                }
            } else {
                //only put this owner to the map
                List<StormBase> stormbases = new ArrayList<>();
                for (StormBase base : topoIdToBases.values()) {
                    if (owner.equals(base.get_owner())) {
                        stormbases.add(base);
                    }
                }
                ownerToBasesMap.put(owner, stormbases);
            }

            List<OwnerResourceSummary> ret = new ArrayList<>();

            //for each owner, get resources, configs, and aggregate
            for (Entry<String, List<StormBase>> ownerToBasesEntry : ownerToBasesMap.entrySet()) {
                String theOwner = ownerToBasesEntry.getKey();
                TopologyResources totalResourcesAggregate = new TopologyResources();

                int totalExecutors = 0;
                int totalWorkers = 0;
                int totalTasks = 0;

                for (StormBase base : ownerToBasesEntry.getValue()) {
                    try {
                        String topoId = toTopoId(base.get_name());
                        TopologyResources resources = getResourcesForTopology(topoId, base);
                        totalResourcesAggregate = totalResourcesAggregate.add(resources);
                        Assignment ownerAssignment = topoIdToAssignments.get(topoId);
                        if (ownerAssignment != null && ownerAssignment.get_executor_node_port() != null) {
                            totalExecutors += ownerAssignment.get_executor_node_port().keySet().size();
                            totalWorkers += new HashSet(ownerAssignment.get_executor_node_port().values()).size();
                            for (List<Long> executorId : ownerAssignment.get_executor_node_port().keySet()) {
                                totalTasks += StormCommon.executorIdToTasks(executorId).size();
                            }
                        }
                    } catch (NotAliveException e) {
                        LOG.warn("{} is not alive.", base.get_name());
                    }
                }

                double requestedTotalMemory = totalResourcesAggregate.getRequestedMemOnHeap()
                                              + totalResourcesAggregate.getRequestedMemOffHeap();
                double assignedTotalMemory = totalResourcesAggregate.getAssignedMemOnHeap()
                                             + totalResourcesAggregate.getAssignedMemOffHeap();

                OwnerResourceSummary ownerResourceSummary = new OwnerResourceSummary(theOwner);
                ownerResourceSummary.set_total_topologies(ownerToBasesEntry.getValue().size());
                ownerResourceSummary.set_total_executors(totalExecutors);
                ownerResourceSummary.set_total_workers(totalWorkers);
                ownerResourceSummary.set_total_tasks(totalTasks);
                ownerResourceSummary.set_memory_usage(assignedTotalMemory);
                ownerResourceSummary.set_cpu_usage(totalResourcesAggregate.getAssignedCpu());
                ownerResourceSummary.set_requested_on_heap_memory(totalResourcesAggregate.getRequestedMemOnHeap());
                ownerResourceSummary.set_requested_off_heap_memory(totalResourcesAggregate.getRequestedMemOffHeap());
                ownerResourceSummary.set_requested_total_memory(requestedTotalMemory);
                ownerResourceSummary.set_requested_cpu(totalResourcesAggregate.getRequestedCpu());
                ownerResourceSummary.set_assigned_on_heap_memory(totalResourcesAggregate.getAssignedMemOnHeap());
                ownerResourceSummary.set_assigned_off_heap_memory(totalResourcesAggregate.getAssignedMemOffHeap());

                if (clusterSchedulerConfig.containsKey(theOwner)) {
                    if (underlyingScheduler instanceof ResourceAwareScheduler) {
                        Map<String, Object> schedulerConfig = (Map) clusterSchedulerConfig.get(theOwner);
                        if (schedulerConfig != null) {
                            ownerResourceSummary.set_memory_guarantee((double) schedulerConfig.getOrDefault("memory", 0));
                            ownerResourceSummary.set_cpu_guarantee((double) schedulerConfig.getOrDefault("cpu", 0));
                            ownerResourceSummary.set_memory_guarantee_remaining(ownerResourceSummary.get_memory_guarantee()
                                                                                - ownerResourceSummary.get_memory_usage());
                            ownerResourceSummary.set_cpu_guarantee_remaining(ownerResourceSummary.get_cpu_guarantee()
                                                                             - ownerResourceSummary.get_cpu_usage());
                        }
                    } else if (underlyingScheduler instanceof MultitenantScheduler) {
                        ownerResourceSummary.set_isolated_node_guarantee((int) clusterSchedulerConfig.getOrDefault(theOwner, 0));
                    }
                }

                LOG.debug("{}", ownerResourceSummary.toString());
                ret.add(ownerResourceSummary);
            }

            return ret;
        } catch (Exception e) {
            LOG.warn("Get owner resource summaries exception. (owner = '{}')", owner);
            if (e instanceof TException) {
                throw (TException) e;
            }
            throw new RuntimeException(e);
        }
    }