in metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java [100:184]
public static ParserTopology build(String zookeeperUrl,
Optional<String> brokerUrl,
List<String> sensorTypes,
ValueSupplier<List> spoutParallelismSupplier,
ValueSupplier<List> spoutNumTasksSupplier,
ValueSupplier<Integer> parserParallelismSupplier,
ValueSupplier<Integer> parserNumTasksSupplier,
ValueSupplier<Integer> errorWriterParallelismSupplier,
ValueSupplier<Integer> errorWriterNumTasksSupplier,
ValueSupplier<List> kafkaSpoutConfigSupplier,
ValueSupplier<String> securityProtocolSupplier,
ValueSupplier<String> outputTopicSupplier,
ValueSupplier<String> errorTopicSupplier,
ValueSupplier<Config> stormConfigSupplier
) throws Exception {
// fetch configuration from zookeeper
ParserConfigurations configs = new ParserConfigurations();
Map<String, SensorParserConfig> sensorToParserConfigs = getSensorParserConfig(zookeeperUrl, sensorTypes, configs);
Collection<SensorParserConfig> parserConfigs = sensorToParserConfigs.values();
@SuppressWarnings("unchecked")
List<Integer> spoutParallelism = (List<Integer>) spoutParallelismSupplier.get(parserConfigs, List.class);
@SuppressWarnings("unchecked")
List<Integer> spoutNumTasks = (List<Integer>) spoutNumTasksSupplier.get(parserConfigs, List.class);
int parserParallelism = parserParallelismSupplier.get(parserConfigs, Integer.class);
int parserNumTasks = parserNumTasksSupplier.get(parserConfigs, Integer.class);
int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfigs, Integer.class);
int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfigs, Integer.class);
String outputTopic = outputTopicSupplier.get(parserConfigs, String.class);
List<Map<String, Object>> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfigs, List.class);
Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfigs, String.class));
// create the spout
TopologyBuilder builder = new TopologyBuilder();
int i = 0;
List<String> spoutIds = new ArrayList<>();
for (Entry<String, SensorParserConfig> entry: sensorToParserConfigs.entrySet()) {
KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, entry.getKey(), securityProtocol,
Optional.ofNullable(kafkaSpoutConfig.get(i)), entry.getValue());
String spoutId = sensorToParserConfigs.size() > 1 ? "kafkaSpout-" + entry.getKey() : "kafkaSpout";
builder.setSpout(spoutId, kafkaSpout, spoutParallelism.get(i))
.setNumTasks(spoutNumTasks.get(i));
spoutIds.add(spoutId);
++i;
}
// create the parser bolt
ParserBolt parserBolt = createParserBolt(
zookeeperUrl,
brokerUrl,
sensorToParserConfigs,
securityProtocol,
configs,
Optional.ofNullable(outputTopic)
);
BoltDeclarer boltDeclarer = builder
.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks);
for (String spoutId : spoutIds) {
boltDeclarer.localOrShuffleGrouping(spoutId);
}
// create the error bolt, if needed
if (errorWriterNumTasks > 0) {
String errorTopic = errorTopicSupplier.get(parserConfigs, String.class);
WriterBolt errorBolt = createErrorBolt(
zookeeperUrl,
brokerUrl,
sensorTypes.get(0),
securityProtocol,
configs,
parserConfigs.iterator().next(),
errorTopic
);
builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
.setNumTasks(errorWriterNumTasks)
.localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
}
return new ParserTopology(builder, stormConfigSupplier.get(parserConfigs, Config.class));
}