public StormTopology buildTopology()

in storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java [130:263]


    public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();

        for (String id : spouts.keySet()) {
            TransactionalSpoutComponent c = spouts.get(id);
            if (c.spout instanceof IRichSpout) {

                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if (!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if (!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);


                BoltDeclarer scd =
                    builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                           .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                           .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                for (SharedMemory request : c.sharedMemory) {
                    scd.addSharedMemory(request);
                }
                scd.addConfigurations(c.componentConf);

                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap<>();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                                                  new TridentBoltExecutor(
                                                      new TridentSpoutExecutor(
                                                          c.commitStateId,
                                                          c.streamName,
                                                          ((ITridentSpout) c.spout)),
                                                      batchIdsForSpouts,
                                                      specs),
                                                  c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if (c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                bd.addConfigurations(c.componentConf);
            }
        }

        for (String id : batchPerTupleSpouts.keySet()) {
            SpoutComponent c = batchPerTupleSpouts.get(id);
            SpoutDeclarer d =
                builder.setSpout(id, new RichSpoutBatchTriggerer((IRichSpout) c.spout, c.streamName, c.batchGroupId), c.parallelism);

            d.addConfigurations(c.componentConf);
        }

        Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
        Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
        Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

        for (String batch : batchesToCommitIds.keySet()) {
            List<String> commitIds = batchesToCommitIds.get(batch);
            SpoutDeclarer masterCoord =
                builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));

            if (onHeap != null) {
                if (offHeap != null) {
                    masterCoord.setMemoryLoad(onHeap, offHeap);
                } else {
                    masterCoord.setMemoryLoad(onHeap);
                }
            }

            if (cpuLoad != null) {
                masterCoord.setCPULoad(cpuLoad);
            }
        }

        for (String id : bolts.keySet()) {
            Component c = bolts.get(id);

            Map<String, CoordSpec> specs = new HashMap<>();

            for (GlobalStreamId s : getBoltSubscriptionStreams(id)) {
                String batch = batchIdsForBolts.get(s);
                if (!specs.containsKey(batch)) {
                    specs.put(batch, new CoordSpec());
                }
                CoordSpec spec = specs.get(batch);
                CoordType ct;
                if (batchPerTupleSpouts.containsKey(s.get_componentId())) {
                    ct = CoordType.single();
                } else {
                    ct = CoordType.all();
                }
                spec.coords.put(s.get_componentId(), ct);
            }

            for (String b : c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }

            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
            for (SharedMemory request : c.sharedMemory) {
                d.addSharedMemory(request);
            }
            d.addConfigurations(c.componentConf);

            for (InputDeclaration inputDecl : c.declarations) {
                inputDecl.declare(d);
            }

            Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
            for (Map.Entry<String, Set<String>> entry : batchToComponents.entrySet()) {
                for (String comp : entry.getValue()) {
                    d.directGrouping(comp, TridentBoltExecutor.coordStream(entry.getKey()));
                }
            }

            for (String b : c.committerBatches) {
                d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
        }

        return builder.createTopology();
    }