public static void main()

in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/StreamPopulator.java [82:119]


  public static void main(String[] args) throws ParseException {
    Options options = new Options()
        .addOption("region", true, "the region containing the kinesis stream")
        .addOption("bucket", true, "the bucket containing the raw event data")
        .addOption("prefix", true, "the prefix of the objects containing the raw event data")
        .addOption("stream", true, "the name of the kinesis stream the events are sent to")
        .addOption("speedup", true, "the speedup factor for replaying events into the kinesis stream")
        .addOption("aggregate", "turn on aggregation of multiple events into a kinesis record")
        .addOption("seek", true, "start replaying events at given timestamp")
        .addOption("statisticsFrequency", true, "print statistics every statisticFrequency ms")
        .addOption("adaptTime", true,"adapts the time of the events; shifts time origin to the invocation of the program (invocation) or sets the time to the ingestion of the event into the stream (ingestion)")
        .addOption("noWatermark", "don't ingest watermarks into the stream")
        .addOption("help", "print this help message");

    CommandLine line = new DefaultParser().parse(options, args);

    if (line.hasOption("help")) {
      new HelpFormatter().printHelp(MethodHandles.lookup().lookupClass().getName(), options);
    } else {
      StreamPopulator populator = new StreamPopulator(
          line.getOptionValue("region", "eu-west-1"),
          line.getOptionValue("bucket", "aws-bigdata-blog"),
          line.getOptionValue("prefix", "artifacts/flink-refarch/data/nyc-tlc-trips.snz/"),
          line.getOptionValue("stream", "taxi-trip-events"),
          line.hasOption("aggregate"),
          Float.valueOf(line.getOptionValue("speedup", "6480")),
          Long.valueOf(line.getOptionValue("statisticsFrequency", "60000")),
          line.getOptionValue("adaptTime", "original"),
          line.hasOption("noWatermark")
      );

      if (line.hasOption("seek")) {
        populator.seek(new DateTime(line.getOptionValue("seek")));
      }

      populator.populate();
    }
  }