public static void main()

in beam-collector/src/main/java/com/google/collector/Collector.java [38:73]


  public static void main(String[] args) {
    CollectorOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(CollectorOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    PersistenceTransform persistenceTransform;
    if (options.getOutputToBigTable()) {
      CloudBigtableTableConfiguration bigtableTableConfig =
          new CloudBigtableTableConfiguration.Builder()
              .withProjectId(options.getBigtableProjectId())
              .withInstanceId(options.getBigtableInstanceId())
              .withTableId(options.getBigtableTableId())
              .build();

      persistenceTransform = Persist.toBigTable(bigtableTableConfig);
    } else {
      persistenceTransform = Persist.toJson(options.getOutput());
    }

    PCollection<SecurityReport> reports = options.getLoadFromBigTable()
        ? LoadReports.fromBigTable(pipeline, options)
        : LoadReports.fromCsv(pipeline, options.getInputFile());

    reports
        // Filter any unwanted or non-actionable reports from the input
        .apply("Filter", Filter.by(new FilterReports()))
        // De-duplicate and aggregate reports, such that repeated violations are merged together
        .apply("Aggregate", new Aggregate())
        // Cluster individual violations into clusters that represent their root causes
        .apply("Cluster", new Cluster())
        // Serialize cluster objects and send them to whichever sink this run was configured with
        .apply("Output", persistenceTransform);

    pipeline.run().waitUntilFinish();
  }