public StormTopology build()

in storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java [749:954]


    public StormTopology build() {
        DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) this.graph.clone();


        completeDrpc(graph, colocate, gen);

        List<SpoutNode> spoutNodes = new ArrayList<>();

        // can be regular nodes (static state) or processor nodes
        Set<Node> boltNodes = new LinkedHashSet<>();
        for (Node n : graph.vertexSet()) {
            if (n instanceof SpoutNode) {
                spoutNodes.add((SpoutNode) n);
            } else if (!(n instanceof PartitionNode)) {
                boltNodes.add(n);
            }
        }


        Set<Group> initialGroups = new LinkedHashSet<>();
        for (List<Node> colocate : colocate.values()) {
            Group g = new Group(graph, colocate);
            boltNodes.removeAll(colocate);
            initialGroups.add(g);
        }
        for (Node n : boltNodes) {
            initialGroups.add(new Group(graph, n));
        }


        GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
        grouper.mergeFully();
        Collection<Group> mergedGroups = grouper.getAllGroups();


        // add identity partitions between groups
        for (IndexedEdge<Node> e : new HashSet<>(graph.edgeSet())) {
            if (!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {
                Group g1 = grouper.nodeGroup(e.source);
                Group g2 = grouper.nodeGroup(e.target);
                // g1 being null means the source is a spout node
                if (g1 == null && !(e.source instanceof SpoutNode)) {
                    throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
                }
                if (g1 == null || !g1.equals(g2)) {
                    graph.removeEdge(e);
                    PartitionNode partitionNode = makeIdentityPartition(e.source);
                    graph.addVertex(partitionNode);
                    graph.addEdge(e.source, partitionNode, new IndexedEdge(e.source, partitionNode, 0));
                    graph.addEdge(partitionNode, e.target, new IndexedEdge(partitionNode, e.target, e.index));
                }
            }
        }
        // if one group subscribes to the same stream with same partitioning multiple times,
        // merge those together (otherwise can end up with many output streams created for that partitioning
        // if need to split into multiple output streams because of same input having different
        // partitioning to the group)

        // this is because can't currently merge splitting logic into a spout
        // not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it
        // works out
        List<Node> forNewGroups = new ArrayList<>();
        for (Group g : mergedGroups) {
            for (PartitionNode n : extraPartitionInputs(g)) {
                Node idNode = makeIdentityNode(n.allOutputFields);
                Node newPartitionNode = new PartitionNode(idNode.streamId, n.name, idNode.allOutputFields, n.thriftGrouping);
                graph.removeVertex(n);

                graph.addVertex(idNode);
                graph.addVertex(newPartitionNode);
                Node parentNode = TridentUtils.getParent(graph, n);
                addEdge(graph, parentNode, idNode, 0);
                addEdge(graph, idNode, newPartitionNode, 0);
                Set<IndexedEdge> outgoing = graph.outgoingEdgesOf(n);
                for (IndexedEdge e : outgoing) {
                    addEdge(graph, newPartitionNode, e.target, e.index);
                }
                Group parentGroup = grouper.nodeGroup(parentNode);
                if (parentGroup == null) {
                    forNewGroups.add(idNode);
                } else {
                    parentGroup.nodes.add(idNode);
                }
            }
        }
        // TODO: in the future, want a way to include this logic in the spout itself,
        // or make it unnecessary by having storm include metadata about which grouping a tuple
        // came from

        for (Node n : forNewGroups) {
            grouper.addGroup(new Group(graph, n));
        }

        // add in spouts as groups so we can get parallelisms
        for (Node n : spoutNodes) {
            grouper.addGroup(new Group(graph, n));
        }

        grouper.reindex();
        mergedGroups = grouper.getAllGroups();


        Map<Node, String> batchGroupMap = new HashMap<>();
        List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
        for (int i = 0; i < connectedComponents.size(); i++) {
            String groupId = "bg" + i;
            for (Node n : connectedComponents.get(i)) {
                batchGroupMap.put(n, groupId);
            }
        }

        //        System.out.println("GRAPH:");
        //        System.out.println(graph);

        Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);

        TridentTopologyBuilder builder = new TridentTopologyBuilder();

        Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
        Map<Group, String> boltIds = genBoltIds(mergedGroups);

        for (SpoutNode sn : spoutNodes) {
            Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));

            Map<String, Number> spoutRes = new HashMap<>(resourceDefaults);
            spoutRes.putAll(sn.getResources());

            Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
            Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
            Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

            SpoutDeclarer spoutDeclarer = null;

            if (sn.type == SpoutNode.SpoutType.DRPC) {

                spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
                                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
            } else {
                ITridentSpout s;
                if (sn.spout instanceof IBatchSpout) {
                    s = new BatchSpoutExecutor((IBatchSpout) sn.spout);
                } else if (sn.spout instanceof ITridentSpout) {
                    s = (ITridentSpout) sn.spout;
                } else {
                    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                    // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                }
                spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
            }

            if (onHeap != null) {
                if (offHeap != null) {
                    spoutDeclarer.setMemoryLoad(onHeap, offHeap);
                } else {
                    spoutDeclarer.setMemoryLoad(onHeap);
                }
            }

            if (cpuLoad != null) {
                spoutDeclarer.setCPULoad(cpuLoad);
            }
        }

        for (Group g : mergedGroups) {
            if (!isSpoutGroup(g)) {
                Integer p = parallelisms.get(g);
                Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
                Map<String, Number> groupRes = g.getResources(resourceDefaults);

                Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
                Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
                Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

                BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
                                                 committerBatches(g, batchGroupMap), streamToGroup);

                if (onHeap != null) {
                    if (offHeap != null) {
                        d.setMemoryLoad(onHeap, offHeap);
                    } else {
                        d.setMemoryLoad(onHeap);
                    }
                }

                if (cpuLoad != null) {
                    d.setCPULoad(cpuLoad);
                }

                for (SharedMemory request : g.getSharedMemory()) {
                    d.addSharedMemory(request);
                }

                Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                for (PartitionNode n : inputs) {
                    Node parent = TridentUtils.getParent(graph, n);
                    String componentId = parent instanceof SpoutNode
                            ? spoutIds.get(parent)
                            : boltIds.get(grouper.nodeGroup(parent));
                    d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
                }
            }
        }
        HashMap<String, Number> combinedMasterCoordResources = new HashMap<>(resourceDefaults);
        combinedMasterCoordResources.putAll(masterCoordResources);
        return builder.buildTopology(combinedMasterCoordResources);
    }