private static Map getGroupParallelisms()

in storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java [336:384]


    private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper,
                                                            Collection<Group> groups) {
        UndirectedGraph<Group, Object> equivs = new Pseudograph<>(Object.class);
        for (Group g : groups) {
            equivs.addVertex(g);
        }
        for (Group g : groups) {
            for (PartitionNode n : externalGroupInputs(g)) {
                if (isIdentityPartition(n)) {
                    Node parent = TridentUtils.getParent(graph, n);
                    Group parentGroup = grouper.nodeGroup(parent);
                    if (parentGroup != null && !parentGroup.equals(g)) {
                        equivs.addEdge(parentGroup, g);
                    }
                }
            }
        }

        Map<Group, Integer> ret = new HashMap<>();
        List<Set<Group>> equivGroups = new ConnectivityInspector<>(equivs).connectedSets();
        for (Set<Group> equivGroup : equivGroups) {
            Integer fixedP = getFixedParallelism(equivGroup);
            Integer maxP = getMaxParallelism(equivGroup);
            if (fixedP != null && maxP != null && maxP < fixedP) {
                throw new RuntimeException("Parallelism is fixed to " + fixedP + " but max parallelism is less than that: " + maxP);
            }


            Integer p = 1;
            for (Group g : equivGroup) {
                for (Node n : g.nodes) {
                    if (n.parallelismHint != null) {
                        p = Math.max(p, n.parallelismHint);
                    }
                }
            }
            if (maxP != null) {
                p = Math.min(maxP, p);
            }

            if (fixedP != null) {
                p = fixedP;
            }
            for (Group g : equivGroup) {
                ret.put(g, p);
            }
        }
        return ret;
    }