in java/dataflow-connector-examples/src/main/java/com/google/cloud/bigtable/dataflow/example/PubsubWordCount.java [149:183]
public static void main(String[] args) throws Exception {
// CloudBigtableOptions is one way to retrieve the options. It's not required.
BigtablePubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtablePubsubOptions.class);
// CloudBigtableTableConfiguration contains the project, instance and table to connect to.
CloudBigtableTableConfiguration config =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
// In order to cancel the pipelines automatically,
// DataflowPipelineRunner is forced to be used.
// Also enables the 2 jobs to run at the same time.
options.setRunner(DataflowRunner.class);
options.as(DataflowPipelineOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
FixedWindows window = FixedWindows.of(Duration.standardMinutes(options.getWindowSize()));
p
.apply(PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
.apply(Window.<String> into(window))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String> perElement())
.apply(ParDo.of(MUTATION_TRANSFORM))
.apply(CloudBigtableIO.writeToTable(config));
p.run().waitUntilFinish();
// Start a second job to inject messages into a Cloud Pubsub topic
injectMessages(options);
}