in flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java [179:275]
private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
IllegalAccessException, NoSuchFieldException {
TopologyDef topologyDef = context.getTopologyDef();
// process stream definitions
HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
for (StreamDef stream : topologyDef.getStreams()) {
Object boltObj = context.getBolt(stream.getTo());
BoltDeclarer declarer = declarers.get(stream.getTo());
if (boltObj instanceof IRichBolt) {
if (declarer == null) {
declarer = builder.setBolt(stream.getTo(),
(IRichBolt) boltObj,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else if (boltObj instanceof IBasicBolt) {
if (declarer == null) {
declarer = builder.setBolt(
stream.getTo(),
(IBasicBolt) boltObj,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else if (boltObj instanceof IWindowedBolt) {
if (declarer == null) {
declarer = builder.setBolt(
stream.getTo(),
(IWindowedBolt) boltObj,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else if (boltObj instanceof IStatefulBolt) {
if (declarer == null) {
declarer = builder.setBolt(
stream.getTo(),
(IStatefulBolt) boltObj,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else {
throw new IllegalArgumentException("Class does not appear to be a bolt: "
+ boltObj.getClass().getName());
}
BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
if (boltDef.getOnHeapMemoryLoad() > -1) {
if (boltDef.getOffHeapMemoryLoad() > -1) {
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
} else {
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
}
}
if (boltDef.getCpuLoad() > -1) {
declarer.setCPULoad(boltDef.getCpuLoad());
}
if (boltDef.getNumTasks() > -1) {
declarer.setNumTasks(boltDef.getNumTasks());
}
GroupingDef grouping = stream.getGrouping();
// if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
switch (grouping.getType()) {
case SHUFFLE:
declarer.shuffleGrouping(stream.getFrom(), streamId);
break;
case FIELDS:
//TODO check for null grouping args
declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
break;
case ALL:
declarer.allGrouping(stream.getFrom(), streamId);
break;
case DIRECT:
declarer.directGrouping(stream.getFrom(), streamId);
break;
case GLOBAL:
declarer.globalGrouping(stream.getFrom(), streamId);
break;
case LOCAL_OR_SHUFFLE:
declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
break;
case NONE:
declarer.noneGrouping(stream.getFrom(), streamId);
break;
case CUSTOM:
declarer.customGrouping(stream.getFrom(), streamId,
buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
break;
default:
throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
}
}
}