in marketing-analytics/predicting/ml-data-windowing-pipeline/DataVisualizationPipeline.java [36:82]
public static void main(String[] args) {
DataVisualizationPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(
DataVisualizationPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<Session> sessions = pipeline
.apply(AvroIO.read(Session.class).from(options.getInputAvroSessionsLocation()));
// Output Facts BigQuery table.
sessions
.apply("MapSessionToFacts", ParDo.of(new MapSessionToFacts()))
.apply("MapFactToTableRow", ParDo.of(new MapFactToTableRow()))
.apply("WriteFactsToBigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputBigQueryFactsTable())
.withoutValidation()
.withSchema(MapFactToTableRow.getTableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
// Output UserActivity BigQuery table.
sessions
.apply("MapUserIdToSession", ParDo.of(new MapUserIdToSession()))
.apply(GroupByKey.<String, Session>create())
.apply("SortSessionsByTime", ParDo.of(new SortSessionsByTime()))
.apply(
"MapSortedSessionsToUserActivities",
ParDo.of(
new MapSortedSessionsToUserActivities(
options.getSnapshotStartDate(),
options.getSnapshotEndDate(),
options.getSlideTimeInSeconds(),
options.getMinimumLookaheadTimeInSeconds(),
options.getMaximumLookaheadTimeInSeconds(),
options.getStopOnFirstPositiveLabel())))
.apply("MapUserActivityToTableRow", ParDo.of(new MapUserActivityToTableRow()))
.apply(
"WriteUserActivitiesToBigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputBigQueryUserActivityTable())
.withoutValidation()
.withSchema(MapUserActivityToTableRow.getTableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
pipeline.run();
}