in src/main/java/com/google/solutions/df/log/aggregations/StreamingBenchmark.java [83:102]
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Trigger at the supplied QPS 2) Generate messages containing fake
* data 3) Write messages to Pub/Sub
*/
pipeline
.apply(
"Trigger",
GenerateSequence.from(0L).withRate(options.getQps(), Duration.standardSeconds(1L)))
.apply(
"GenerateMessages",
ParDo.of(new MessageGeneratorFn(options.getSchemaLocation(), options.getEventType())))
.apply("WriteToPubsub", PubsubIO.writeMessages().to(options.getTopic()));
return pipeline.run();
}