public static void main()

in marketing-analytics/predicting/ml-data-windowing-pipeline/GenerateFeaturesPipeline.java [60:158]


  public static void main(String[] args) {

    GenerateFeaturesPipelineOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(GenerateFeaturesPipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    FeatureAccumulatorFactory factory = new FeatureAccumulatorFactory();

    PCollection<KV<String, AccumulatorOptions>> accumulatorOptions =
        getAllAccumulatorOptionsCollection(options, pipeline, factory);

    PCollectionView<Map<String, Iterable<AccumulatorOptions>>> accumulatorOptionsView =
        accumulatorOptions
            .apply(
                "Group accumulator options by column name",
                GroupByKey.<String, AccumulatorOptions>create())
            .apply(
                "Create accumulator options view",
                View.<String, Iterable<AccumulatorOptions>>asMap());

    PCollectionView<Map<String, String>> schemaView =
        accumulatorOptions
            .apply(
                "Create feature bq column names",
                ParDo.of(
                    new DoFn<KV<String, AccumulatorOptions>, Iterable<Field>>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        KV<String, AccumulatorOptions> e = c.element();
                        c.output(e.getValue().createFieldSchemas());
                      }
                    }))
            .apply(
                "Combine feature bq column names",
                Combine.globally(
                    new CombineFn<Iterable<Field>, List<Field>, Iterable<Field>>() {

                      @Override
                      public List<Field> createAccumulator() {
                        return Lists.newArrayList();
                      }

                      @Override
                      public List<Field> addInput(List<Field> schemas, Iterable<Field> input) {
                        input.forEach(schemas::add);
                        return schemas;
                      }

                      @Override
                      public List<Field> mergeAccumulators(Iterable<List<Field>> lists) {
                        return Lists.newArrayList(Iterables.concat(lists));
                      }

                      @Override
                      public Iterable<Field> extractOutput(List<Field> schemas) {
                        return schemas;
                      }
                    }))
            .apply(
                "Create table schema",
                ParDo.of(
                    new CreateTableSchemaFn(
                        options.getFeatureDestinationTable(),
                        options.getTrainMode(),
                        options.getShowEffectiveDate(),
                        options.getShowStartTime(),
                        options.getShowEndTime(),
                        options.getShowEffectiveDateWeekOfYear(),
                        options.getShowEffectiveDateMonthOfYear())))
            .apply("Create table schema view", View.asSingleton());

    pipeline
        .apply(
            "Read Windowed Avro Facts",
            AvroIO.read(LookbackWindow.class).from(options.getWindowedAvroLocation()))
        .apply(
            "Extract Features",
            ParDo.of(
                    new ExtractFeatureFn(
                        factory,
                        accumulatorOptionsView,
                        options.getTrainMode(),
                        options.getShowEffectiveDate(),
                        options.getShowStartTime(),
                        options.getShowEndTime(),
                        options.getShowEffectiveDateWeekOfYear(),
                        options.getShowEffectiveDateMonthOfYear()))
                .withSideInputs(accumulatorOptionsView))
        .apply(
            BigQueryIO.writeTableRows()
                .to(options.getFeatureDestinationTable())
                .withSchemaFromView(schemaView)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

    pipeline.run();
  }