in src/main/java/com/google/solutions/df/log/aggregations/SecureLogAggregationPipeline.java [61:148]
public static PipelineResult run(SecureLogAggregationPipelineOptions options) {
Pipeline p = Pipeline.create(options);
// side input as centroid id, radius and other features
PCollectionView<List<CentroidVector>> centroidFeatures =
p.apply(
"Latest Normalized Data",
BigQueryIO.read(new ClusterDataMapElement())
.fromQuery(Util.getClusterDetails(options.getClusterQuery()))
.usingStandardSql()
.withMethod(Method.EXPORT))
.apply("Centroids Data as Input", View.asList());
// read from GCS and pub sub
PCollection<Row> maybeTokenizedRows =
p.apply(
"Read FlowLog Data",
ReadFlowLogTransform.newBuilder()
.setFilePattern(options.getInputFilePattern())
.setPollInterval(DEFAULT_POLL_INTERVAL)
.setSubscriber(options.getSubscriberId())
.build());
// if passed, raw log data will be stored with country where IP is originated.
if (options.getLogTableSpec() != null) {
maybeTokenizedRows
.apply(
"ConvertIpToGeo",
RawLogDataTransform.newBuilder().setDbPath(options.getGeoDbpath()).build())
.setRowSchema(Util.networkLogSchemaWithGeo)
.apply(
"Batch to Log Table",
BQWriteTransform.newBuilder()
.setTableSpec(options.getLogTableSpec())
.setBatchFrequency(options.getBatchFrequency())
.setMethod(options.getWriteMethod())
.setClusterFields(Util.getRawTableClusterFields())
.setGcsTempLocation(StaticValueProvider.of(options.getCustomGcsTempLocation()))
.build());
}
PCollection<Row> featureExtractedRows =
maybeTokenizedRows
.apply(
"Fixed Window",
Window.<Row>into(
FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("Feature Extraction", new LogRowTransform())
.apply(
"DLP Transformation",
DLPTransform.newBuilder()
.setBatchSize(options.getBatchSize())
.setDeidTemplateName(options.getDeidTemplateName())
.setInspectTemplateName(options.getInspectTemplateName())
.setProjectId(options.getProject())
.setRandomKey(
Util.randomKeyGenerator(
Math.floorDiv(
options.getIngestionRate().intValue(), options.getMaxNumWorkers())))
.build())
.setRowSchema(Util.bqLogSchema);
featureExtractedRows.apply(
"Batch to Feature Table",
BQWriteTransform.newBuilder()
.setTableSpec(options.getTableSpec())
.setBatchFrequency(options.getBatchFrequency())
.setMethod(options.getWriteMethod())
.setClusterFields(Util.getFeatureTableClusterFields())
.setGcsTempLocation(StaticValueProvider.of(options.getCustomGcsTempLocation()))
.build());
// prediction - let's have some fun
featureExtractedRows
.apply(
"Anomaly Detection",
PredictTransform.newBuilder().setCentroidFeatureVector(centroidFeatures).build())
.apply(
"Stream To Outlier Table",
BQWriteTransform.newBuilder()
.setTableSpec(options.getOutlierTableSpec())
.setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.build());
return p.run();
}