public static PipelineResult run()

in src/main/java/com/google/solutions/df/log/aggregations/FraudDetectionFinServTranPipeline.java [45:82]


  public static PipelineResult run(FraudDetectionFinServTranPipelineOptions options) {

    Pipeline p = Pipeline.create(options);
    PCollection<Row> transaction =
        p.apply(
            "ReadTransactionTransform",
            ReadTransactionTransform.newBuilder()
                .setFilePattern(options.getInputFilePattern())
                .setPollInterval(DEFAULT_POLL_INTERVAL)
                .setSubscriber(options.getSubscriberId())
                .build());

    PCollection<Row> predictionData =
        transaction.apply(
            "PredictTransform",
            PredictTransform.newBuilder()
                .setBatchSize(options.getBatchSize())
                .setModelId(options.getModelId())
                .setVersionId(options.getVersionId())
                .setProjectId(options.getProject())
                .setRandomKey(options.getKeyRange())
                .setProbability(options.getProbability())
                .build());
    transaction.apply(
        "InsertTransactionData",
        BQWriteTransform.newBuilder()
            .setTableSpec(options.getTableSpec())
            .setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
            .build());

    predictionData.apply(
        "StreamFraudData",
        BQWriteTransform.newBuilder()
            .setTableSpec(options.getOutlierTableSpec())
            .setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
            .build());
    return p.run();
  }