in src/main/java/com/google/solutions/df/log/aggregations/FraudDetectionFinServTranPipeline.java [45:82]
public static PipelineResult run(FraudDetectionFinServTranPipelineOptions options) {
Pipeline p = Pipeline.create(options);
PCollection<Row> transaction =
p.apply(
"ReadTransactionTransform",
ReadTransactionTransform.newBuilder()
.setFilePattern(options.getInputFilePattern())
.setPollInterval(DEFAULT_POLL_INTERVAL)
.setSubscriber(options.getSubscriberId())
.build());
PCollection<Row> predictionData =
transaction.apply(
"PredictTransform",
PredictTransform.newBuilder()
.setBatchSize(options.getBatchSize())
.setModelId(options.getModelId())
.setVersionId(options.getVersionId())
.setProjectId(options.getProject())
.setRandomKey(options.getKeyRange())
.setProbability(options.getProbability())
.build());
transaction.apply(
"InsertTransactionData",
BQWriteTransform.newBuilder()
.setTableSpec(options.getTableSpec())
.setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.build());
predictionData.apply(
"StreamFraudData",
BQWriteTransform.newBuilder()
.setTableSpec(options.getOutlierTableSpec())
.setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.build());
return p.run();
}