public static void main()

in java/dataflow-connector-examples/src/main/java/com/google/cloud/bigtable/dataflow/example/PubsubWordCount.java [149:183]


  public static void main(String[] args) throws Exception {
    // CloudBigtableOptions is one way to retrieve the options.  It's not required.
    BigtablePubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtablePubsubOptions.class);

    // CloudBigtableTableConfiguration contains the project, instance and table to connect to.
    CloudBigtableTableConfiguration config =
        new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

    // In order to cancel the pipelines automatically,
    // DataflowPipelineRunner is forced to be used.
    // Also enables the 2 jobs to run at the same time.
    options.setRunner(DataflowRunner.class);

    options.as(DataflowPipelineOptions.class).setStreaming(true);
    Pipeline p = Pipeline.create(options);

    FixedWindows window = FixedWindows.of(Duration.standardMinutes(options.getWindowSize()));

    p
        .apply(PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
        .apply(Window.<String> into(window))
        .apply(ParDo.of(new ExtractWordsFn()))
        .apply(Count.<String> perElement())
        .apply(ParDo.of(MUTATION_TRANSFORM))
        .apply(CloudBigtableIO.writeToTable(config));

    p.run().waitUntilFinish();
    // Start a second job to inject messages into a Cloud Pubsub topic
    injectMessages(options);
  }