private static void buildStreamDefinitions()

in flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java [179:275]


    private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
            throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
            IllegalAccessException, NoSuchFieldException {
        TopologyDef topologyDef = context.getTopologyDef();
        // process stream definitions
        HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
        for (StreamDef stream : topologyDef.getStreams()) {
            Object boltObj = context.getBolt(stream.getTo());
            BoltDeclarer declarer = declarers.get(stream.getTo());
            if (boltObj instanceof IRichBolt) {
                if (declarer == null) {
                    declarer = builder.setBolt(stream.getTo(),
                            (IRichBolt) boltObj,
                            topologyDef.parallelismForBolt(stream.getTo()));
                    declarers.put(stream.getTo(), declarer);
                }
            } else if (boltObj instanceof IBasicBolt) {
                if (declarer == null) {
                    declarer = builder.setBolt(
                            stream.getTo(),
                            (IBasicBolt) boltObj,
                            topologyDef.parallelismForBolt(stream.getTo()));
                    declarers.put(stream.getTo(), declarer);
                }
            } else if (boltObj instanceof IWindowedBolt) {
                if (declarer == null) {
                    declarer = builder.setBolt(
                            stream.getTo(),
                            (IWindowedBolt) boltObj,
                            topologyDef.parallelismForBolt(stream.getTo()));
                    declarers.put(stream.getTo(), declarer);
                }
            } else if (boltObj instanceof IStatefulBolt) {
                if (declarer == null) {
                    declarer = builder.setBolt(
                            stream.getTo(),
                            (IStatefulBolt) boltObj,
                            topologyDef.parallelismForBolt(stream.getTo()));
                    declarers.put(stream.getTo(), declarer);
                }
            } else {
                throw new IllegalArgumentException("Class does not appear to be a bolt: "
                        + boltObj.getClass().getName());
            }

            BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
            if (boltDef.getOnHeapMemoryLoad() > -1) {
                if (boltDef.getOffHeapMemoryLoad() > -1) {
                    declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
                } else {
                    declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
                }
            }
            if (boltDef.getCpuLoad() > -1) {
                declarer.setCPULoad(boltDef.getCpuLoad());
            }
            if (boltDef.getNumTasks() > -1) {
                declarer.setNumTasks(boltDef.getNumTasks());
            }

            GroupingDef grouping = stream.getGrouping();
            // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
            String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());


            switch (grouping.getType()) {
                case SHUFFLE:
                    declarer.shuffleGrouping(stream.getFrom(), streamId);
                    break;
                case FIELDS:
                    //TODO check for null grouping args
                    declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
                    break;
                case ALL:
                    declarer.allGrouping(stream.getFrom(), streamId);
                    break;
                case DIRECT:
                    declarer.directGrouping(stream.getFrom(), streamId);
                    break;
                case GLOBAL:
                    declarer.globalGrouping(stream.getFrom(), streamId);
                    break;
                case LOCAL_OR_SHUFFLE:
                    declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
                    break;
                case NONE:
                    declarer.noneGrouping(stream.getFrom(), streamId);
                    break;
                case CUSTOM:
                    declarer.customGrouping(stream.getFrom(), streamId,
                            buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
                    break;
                default:
                    throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
            }
        }
    }