private TopologyInfo getTopologyInfoWithOptsImpl()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [4297:4380]


    private TopologyInfo getTopologyInfoWithOptsImpl(String topoId, GetInfoOptions options) throws NotAliveException,
        AuthorizationException, InvalidTopologyException, Exception {
        CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyInfo");
        if (common.base == null) {
            throw new WrappedNotAliveException(topoId);
        }
        IStormClusterState state = stormClusterState;
        NumErrorsChoice numErrChoice = Utils.OR(options.get_num_err_choice(), NumErrorsChoice.ALL);
        Map<String, List<ErrorInfo>> errors = new HashMap<>();
        for (String component : common.allComponents) {
            switch (numErrChoice) {
                case NONE:
                    errors.put(component, Collections.emptyList());
                    break;
                case ONE:
                    List<ErrorInfo> errList = new ArrayList<>();
                    ErrorInfo info = state.lastError(topoId, component);
                    if (info != null) {
                        errList.add(info);
                    }
                    errors.put(component, errList);
                    break;
                case ALL:
                    errors.put(component, state.errors(topoId, component));
                    break;
                default:
                    LOG.warn("Got invalid NumErrorsChoice '{}'", numErrChoice);
                    errors.put(component, state.errors(topoId, component));
                    break;
            }
        }

        List<ExecutorSummary> summaries = new ArrayList<>();
        if (common.assignment != null) {
            for (Entry<List<Long>, NodeInfo> entry : common.assignment.get_executor_node_port().entrySet()) {
                NodeInfo ni = entry.getValue();
                ExecutorInfo execInfo = toExecInfo(entry.getKey());
                Map<String, String> nodeToHost = common.assignment.get_node_host();
                Map<String, Object> heartbeat = common.beats.get(ClientStatsUtil.convertExecutor(entry.getKey()));
                if (heartbeat == null) {
                    heartbeat = Collections.emptyMap();
                }
                ExecutorSummary summ = new ExecutorSummary(execInfo,
                                                           common.taskToComponent.get(execInfo.get_task_start()),
                                                           nodeToHost.get(ni.get_node()), ni.get_port_iterator().next().intValue(),
                                                           (Integer) heartbeat.getOrDefault("uptime", 0));

                //heartbeats "stats"
                Map ex = (Map) heartbeat.get("stats");
                if (ex != null) {
                    ExecutorStats stats = StatsUtil.thriftifyExecutorStats(ex);
                    summ.set_stats(stats);
                }
                summaries.add(summ);
            }
        }
        TopologyInfo topoInfo = new TopologyInfo(topoId, common.topoName, Time.deltaSecs(common.launchTimeSecs),
                                                 summaries, extractStatusStr(common.base), errors);
        if (common.topology.is_set_storm_version()) {
            topoInfo.set_storm_version(common.topology.get_storm_version());
        }
        if (common.base.is_set_owner()) {
            topoInfo.set_owner(common.base.get_owner());
        }
        String schedStatus = idToSchedStatus.get().get(topoId);
        if (schedStatus != null) {
            topoInfo.set_sched_status(schedStatus);
        }
        TopologyResources resources = getResourcesForTopology(topoId, common.base);
        if (resources != null && underlyingScheduler instanceof ResourceAwareScheduler) {
            topoInfo.set_requested_memonheap(resources.getRequestedMemOnHeap());
            topoInfo.set_requested_memoffheap(resources.getRequestedMemOffHeap());
            topoInfo.set_requested_cpu(resources.getRequestedCpu());
            topoInfo.set_assigned_memonheap(resources.getAssignedMemOnHeap());
            topoInfo.set_assigned_memoffheap(resources.getAssignedMemOffHeap());
            topoInfo.set_assigned_cpu(resources.getAssignedCpu());

        }
        if (common.base.is_set_component_debug()) {
            topoInfo.set_component_debug(common.base.get_component_debug());
        }
        topoInfo.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId)));
        return topoInfo;
    }