private StormTopology createTopology()

in storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java [97:175]


    private StormTopology createTopology(DRPCSpout spout) {
        final String SPOUT_ID = "spout";
        final String PREPARE_ID = "prepare-request";

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUT_ID, spout);
        builder.setBolt(PREPARE_ID, new PrepareRequest())
               .noneGrouping(SPOUT_ID);
        int i = 0;
        for (; i < components.size(); i++) {
            Component component = components.get(i);

            Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
            if (i == 1) {
                source.put(boltId(i - 1), SourceArgs.single());
            } else if (i >= 2) {
                source.put(boltId(i - 1), SourceArgs.all());
            }
            IdStreamSpec idSpec = null;
            if (i == components.size() - 1 && component.bolt instanceof FinishedCallback) {
                idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
            }
            BoltDeclarer declarer = builder.setBolt(
                boltId(i),
                new CoordinatedBolt(component.bolt, source, idSpec),
                component.parallelism);

            for (SharedMemory request : component.sharedMemory) {
                declarer.addSharedMemory(request);
            }

            if (!component.componentConf.isEmpty()) {
                declarer.addConfigurations(component.componentConf);
            }

            if (idSpec != null) {
                declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
            }
            if (i == 0 && component.declarations.isEmpty()) {
                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
            } else {
                String prevId;
                if (i == 0) {
                    prevId = PREPARE_ID;
                } else {
                    prevId = boltId(i - 1);
                }
                for (InputDeclaration declaration : component.declarations) {
                    declaration.declare(prevId, declarer);
                }
            }
            if (i > 0) {
                declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID);
            }
        }

        IRichBolt lastBolt = components.get(components.size() - 1).bolt;
        OutputFieldsGetter getter = new OutputFieldsGetter();
        lastBolt.declareOutputFields(getter);
        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
        if (streams.size() != 1) {
            throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
        }
        String outputStream = streams.keySet().iterator().next();
        List<String> fields = streams.get(outputStream).get_output_fields();
        if (fields.size() != 2) {
            throw new RuntimeException(
                "Output stream of last component in LinearDRPCTopology must contain exactly two fields. "
                + "The first should be the request id, and the second should be the result.");
        }

        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
               .fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0)))
               .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
        i++;
        builder.setBolt(boltId(i), new ReturnResults())
               .noneGrouping(boltId(i - 1));
        return builder.createTopology();
    }