public PCollection expand()

in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/PredictTransform.java [113:157]


  public PCollection<Row> expand(PCollection<Row> input) {

    PCollection<Row> predictInput =
        input
            .apply(
                "Fixed Window",
                Window.<Row>into(FixedWindows.of(Duration.standardSeconds(5)))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .discardingFiredPanes()
                    .withAllowedLateness(Duration.ZERO))
            .apply(
                "ModifiedRow",
                ParDo.of(
                    new DoFn<Row, Row>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Row currentRow = c.element();
                        Row modifiedRow =
                            Row.withSchema(Util.prerdictonInputSchema)
                                .addValues(
                                    currentRow.getValue("step"),
                                    currentRow.getValue("type"),
                                    currentRow.getValue("amount"),
                                    currentRow.getValue("oldbalanceOrg"),
                                    currentRow.getValue("newbalanceOrig"),
                                    currentRow.getValue("oldbalanceDest"),
                                    currentRow.getValue("newbalanceDest"),
                                    currentRow.getValue("transactionId"))
                                .build();

                        LOG.debug("Modified Row {}", modifiedRow);
                        c.output(modifiedRow);
                      }
                    }))
            .setRowSchema(Util.prerdictonInputSchema);

    return predictInput
        .apply("RowToJson", ToJson.of())
        .apply("AddKey", WithKeys.of(new Random().nextInt(randomKey())))
        .apply("Batch", ParDo.of(new BatchRequest(batchSize())))
        .apply(
            "Predict",
            ParDo.of(new PredictRemote(projectId(), modelId(), versionId(), probability())))
        .setRowSchema(Util.prerdictonOutputSchema);
  }