in src/main/java/com/google/solutions/df/log/aggregations/common/LogRowTransform.java [40:77]
public PCollection<Row> expand(PCollection<Row> row) {
PCollection<Row> aggrRow =
row.apply(
"Add Columns",
AddFields.<Row>create()
.field("dstSubnet", Schema.FieldType.STRING)
.field("duration", Schema.FieldType.INT32));
return aggrRow
.apply("Create Aggr Row", MapElements.via(new LogAggrMapElement()))
.setRowSchema(aggrRow.getSchema())
.apply(
"Group By SubId & DstSubNet",
Group.<Row>byFieldNames("subscriberId", "dstSubnet")
.aggregateField(
"srcIP",
new ApproximateUnique.ApproximateUniqueCombineFn<String>(
SAMPLE_SIZE, StringUtf8Coder.of()),
"number_of_unique_ips")
.aggregateField(
"srcPort",
new ApproximateUnique.ApproximateUniqueCombineFn<Integer>(
SAMPLE_SIZE, VarIntCoder.of()),
"number_of_unique_ports")
.aggregateField("srcIP", Count.combineFn(), "number_of_records")
.aggregateField("txBytes", new AvgCombineFn(), "avg_tx_bytes")
.aggregateField("txBytes", Max.ofIntegers(), "max_tx_bytes")
.aggregateField("txBytes", Min.ofIntegers(), "min_tx_bytes")
.aggregateField("rxBytes", new AvgCombineFn(), "avg_rx_bytes")
.aggregateField("rxBytes", Max.ofIntegers(), "max_rx_bytes")
.aggregateField("rxBytes", Min.ofIntegers(), "min_rx_bytes")
.aggregateField("duration", new AvgCombineFn(), "avg_duration")
.aggregateField("duration", Max.ofIntegers(), "max_duration")
.aggregateField("duration", Min.ofIntegers(), "min_duration"))
.apply("Merge Aggr Row", MapElements.via(new MergeLogAggrMap()))
.setRowSchema(Util.bqLogSchema);
}