public static PipelineResult run()

in src/main/java/com/google/solutions/df/log/aggregations/SecureLogAggregationPipeline.java [61:148]


  public static PipelineResult run(SecureLogAggregationPipelineOptions options) {

    Pipeline p = Pipeline.create(options);

    // side input as centroid id, radius and other features
    PCollectionView<List<CentroidVector>> centroidFeatures =
        p.apply(
                "Latest Normalized Data",
                BigQueryIO.read(new ClusterDataMapElement())
                    .fromQuery(Util.getClusterDetails(options.getClusterQuery()))
                    .usingStandardSql()
                    .withMethod(Method.EXPORT))
            .apply("Centroids Data as Input", View.asList());
    // read from GCS and pub sub
    PCollection<Row> maybeTokenizedRows =
        p.apply(
            "Read FlowLog Data",
            ReadFlowLogTransform.newBuilder()
                .setFilePattern(options.getInputFilePattern())
                .setPollInterval(DEFAULT_POLL_INTERVAL)
                .setSubscriber(options.getSubscriberId())
                .build());
    // if passed, raw log data will be stored with country where IP is originated.
    if (options.getLogTableSpec() != null) {

      maybeTokenizedRows
          .apply(
              "ConvertIpToGeo",
              RawLogDataTransform.newBuilder().setDbPath(options.getGeoDbpath()).build())
          .setRowSchema(Util.networkLogSchemaWithGeo)
          .apply(
              "Batch to Log Table",
              BQWriteTransform.newBuilder()
                  .setTableSpec(options.getLogTableSpec())
                  .setBatchFrequency(options.getBatchFrequency())
                  .setMethod(options.getWriteMethod())
                  .setClusterFields(Util.getRawTableClusterFields())
                  .setGcsTempLocation(StaticValueProvider.of(options.getCustomGcsTempLocation()))
                  .build());
    }

    PCollection<Row> featureExtractedRows =
        maybeTokenizedRows
            .apply(
                "Fixed Window",
                Window.<Row>into(
                        FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .discardingFiredPanes()
                    .withAllowedLateness(Duration.ZERO))
            .apply("Feature Extraction", new LogRowTransform())
            .apply(
                "DLP Transformation",
                DLPTransform.newBuilder()
                    .setBatchSize(options.getBatchSize())
                    .setDeidTemplateName(options.getDeidTemplateName())
                    .setInspectTemplateName(options.getInspectTemplateName())
                    .setProjectId(options.getProject())
                    .setRandomKey(
                        Util.randomKeyGenerator(
                            Math.floorDiv(
                                options.getIngestionRate().intValue(), options.getMaxNumWorkers())))
                    .build())
            .setRowSchema(Util.bqLogSchema);

    featureExtractedRows.apply(
        "Batch to Feature Table",
        BQWriteTransform.newBuilder()
            .setTableSpec(options.getTableSpec())
            .setBatchFrequency(options.getBatchFrequency())
            .setMethod(options.getWriteMethod())
            .setClusterFields(Util.getFeatureTableClusterFields())
            .setGcsTempLocation(StaticValueProvider.of(options.getCustomGcsTempLocation()))
            .build());

    // prediction - let's have some fun
    featureExtractedRows
        .apply(
            "Anomaly Detection",
            PredictTransform.newBuilder().setCentroidFeatureVector(centroidFeatures).build())
        .apply(
            "Stream To Outlier Table",
            BQWriteTransform.newBuilder()
                .setTableSpec(options.getOutlierTableSpec())
                .setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .build());
    return p.run();
  }