in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/PredictTransform.java [113:157]
public PCollection<Row> expand(PCollection<Row> input) {
PCollection<Row> predictInput =
input
.apply(
"Fixed Window",
Window.<Row>into(FixedWindows.of(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(
"ModifiedRow",
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
Row currentRow = c.element();
Row modifiedRow =
Row.withSchema(Util.prerdictonInputSchema)
.addValues(
currentRow.getValue("step"),
currentRow.getValue("type"),
currentRow.getValue("amount"),
currentRow.getValue("oldbalanceOrg"),
currentRow.getValue("newbalanceOrig"),
currentRow.getValue("oldbalanceDest"),
currentRow.getValue("newbalanceDest"),
currentRow.getValue("transactionId"))
.build();
LOG.debug("Modified Row {}", modifiedRow);
c.output(modifiedRow);
}
}))
.setRowSchema(Util.prerdictonInputSchema);
return predictInput
.apply("RowToJson", ToJson.of())
.apply("AddKey", WithKeys.of(new Random().nextInt(randomKey())))
.apply("Batch", ParDo.of(new BatchRequest(batchSize())))
.apply(
"Predict",
ParDo.of(new PredictRemote(projectId(), modelId(), versionId(), probability())))
.setRowSchema(Util.prerdictonOutputSchema);
}