in beam-collector/src/main/java/com/google/collector/Collector.java [38:73]
public static void main(String[] args) {
CollectorOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CollectorOptions.class);
Pipeline pipeline = Pipeline.create(options);
PersistenceTransform persistenceTransform;
if (options.getOutputToBigTable()) {
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
persistenceTransform = Persist.toBigTable(bigtableTableConfig);
} else {
persistenceTransform = Persist.toJson(options.getOutput());
}
PCollection<SecurityReport> reports = options.getLoadFromBigTable()
? LoadReports.fromBigTable(pipeline, options)
: LoadReports.fromCsv(pipeline, options.getInputFile());
reports
// Filter any unwanted or non-actionable reports from the input
.apply("Filter", Filter.by(new FilterReports()))
// De-duplicate and aggregate reports, such that repeated violations are merged together
.apply("Aggregate", new Aggregate())
// Cluster individual violations into clusters that represent their root causes
.apply("Cluster", new Cluster())
// Serialize cluster objects and send them to whichever sink this run was configured with
.apply("Output", persistenceTransform);
pipeline.run().waitUntilFinish();
}