public static PipelineResult run()

in src/main/java/com/google/solutions/df/log/aggregations/StreamingBenchmark.java [83:102]


  public static PipelineResult run(Options options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Trigger at the supplied QPS 2) Generate messages containing fake
     * data 3) Write messages to Pub/Sub
     */
    pipeline
        .apply(
            "Trigger",
            GenerateSequence.from(0L).withRate(options.getQps(), Duration.standardSeconds(1L)))
        .apply(
            "GenerateMessages",
            ParDo.of(new MessageGeneratorFn(options.getSchemaLocation(), options.getEventType())))
        .apply("WriteToPubsub", PubsubIO.writeMessages().to(options.getTopic()));

    return pipeline.run();
  }