public static ParserTopology build()

in metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java [100:184]


  public static ParserTopology build(String zookeeperUrl,
                                      Optional<String> brokerUrl,
                                      List<String> sensorTypes,
                                      ValueSupplier<List> spoutParallelismSupplier,
                                      ValueSupplier<List> spoutNumTasksSupplier,
                                      ValueSupplier<Integer> parserParallelismSupplier,
                                      ValueSupplier<Integer> parserNumTasksSupplier,
                                      ValueSupplier<Integer> errorWriterParallelismSupplier,
                                      ValueSupplier<Integer> errorWriterNumTasksSupplier,
                                      ValueSupplier<List> kafkaSpoutConfigSupplier,
                                      ValueSupplier<String> securityProtocolSupplier,
                                      ValueSupplier<String> outputTopicSupplier,
                                      ValueSupplier<String> errorTopicSupplier,
                                      ValueSupplier<Config> stormConfigSupplier
  ) throws Exception {

    // fetch configuration from zookeeper
    ParserConfigurations configs = new ParserConfigurations();
    Map<String, SensorParserConfig> sensorToParserConfigs = getSensorParserConfig(zookeeperUrl, sensorTypes, configs);
    Collection<SensorParserConfig> parserConfigs = sensorToParserConfigs.values();

    @SuppressWarnings("unchecked")
    List<Integer> spoutParallelism = (List<Integer>) spoutParallelismSupplier.get(parserConfigs, List.class);
    @SuppressWarnings("unchecked")
    List<Integer> spoutNumTasks = (List<Integer>) spoutNumTasksSupplier.get(parserConfigs, List.class);
    int parserParallelism = parserParallelismSupplier.get(parserConfigs, Integer.class);
    int parserNumTasks = parserNumTasksSupplier.get(parserConfigs, Integer.class);
    int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfigs, Integer.class);
    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfigs, Integer.class);
    String outputTopic = outputTopicSupplier.get(parserConfigs, String.class);

    List<Map<String, Object>> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfigs, List.class);
    Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfigs, String.class));

    // create the spout
    TopologyBuilder builder = new TopologyBuilder();
    int i = 0;
    List<String> spoutIds = new ArrayList<>();
    for (Entry<String, SensorParserConfig> entry: sensorToParserConfigs.entrySet()) {
      KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, entry.getKey(), securityProtocol,
          Optional.ofNullable(kafkaSpoutConfig.get(i)), entry.getValue());
      String spoutId = sensorToParserConfigs.size() > 1 ? "kafkaSpout-" + entry.getKey() : "kafkaSpout";
      builder.setSpout(spoutId, kafkaSpout, spoutParallelism.get(i))
          .setNumTasks(spoutNumTasks.get(i));
      spoutIds.add(spoutId);
      ++i;
    }

    // create the parser bolt
    ParserBolt parserBolt = createParserBolt(
        zookeeperUrl,
        brokerUrl,
        sensorToParserConfigs,
        securityProtocol,
        configs,
        Optional.ofNullable(outputTopic)
    );

    BoltDeclarer boltDeclarer = builder
        .setBolt("parserBolt", parserBolt, parserParallelism)
        .setNumTasks(parserNumTasks);

    for (String spoutId : spoutIds) {
      boltDeclarer.localOrShuffleGrouping(spoutId);
    }

    // create the error bolt, if needed
    if (errorWriterNumTasks > 0) {
      String errorTopic = errorTopicSupplier.get(parserConfigs, String.class);
      WriterBolt errorBolt = createErrorBolt(
          zookeeperUrl,
          brokerUrl,
          sensorTypes.get(0),
          securityProtocol,
          configs,
          parserConfigs.iterator().next(),
          errorTopic
      );
      builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
              .setNumTasks(errorWriterNumTasks)
              .localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
    }

    return new ParserTopology(builder, stormConfigSupplier.get(parserConfigs, Config.class));
  }