public PCollection expand()

in src/main/java/com/google/solutions/df/log/aggregations/common/LogRowTransform.java [40:77]


  public PCollection<Row> expand(PCollection<Row> row) {

    PCollection<Row> aggrRow =
        row.apply(
            "Add Columns",
            AddFields.<Row>create()
                .field("dstSubnet", Schema.FieldType.STRING)
                .field("duration", Schema.FieldType.INT32));

    return aggrRow
        .apply("Create Aggr Row", MapElements.via(new LogAggrMapElement()))
        .setRowSchema(aggrRow.getSchema())
        .apply(
            "Group By SubId & DstSubNet",
            Group.<Row>byFieldNames("subscriberId", "dstSubnet")
                .aggregateField(
                    "srcIP",
                    new ApproximateUnique.ApproximateUniqueCombineFn<String>(
                        SAMPLE_SIZE, StringUtf8Coder.of()),
                    "number_of_unique_ips")
                .aggregateField(
                    "srcPort",
                    new ApproximateUnique.ApproximateUniqueCombineFn<Integer>(
                        SAMPLE_SIZE, VarIntCoder.of()),
                    "number_of_unique_ports")
                .aggregateField("srcIP", Count.combineFn(), "number_of_records")
                .aggregateField("txBytes", new AvgCombineFn(), "avg_tx_bytes")
                .aggregateField("txBytes", Max.ofIntegers(), "max_tx_bytes")
                .aggregateField("txBytes", Min.ofIntegers(), "min_tx_bytes")
                .aggregateField("rxBytes", new AvgCombineFn(), "avg_rx_bytes")
                .aggregateField("rxBytes", Max.ofIntegers(), "max_rx_bytes")
                .aggregateField("rxBytes", Min.ofIntegers(), "min_rx_bytes")
                .aggregateField("duration", new AvgCombineFn(), "avg_duration")
                .aggregateField("duration", Max.ofIntegers(), "max_duration")
                .aggregateField("duration", Min.ofIntegers(), "min_duration"))
        .apply("Merge Aggr Row", MapElements.via(new MergeLogAggrMap()))
        .setRowSchema(Util.bqLogSchema);
  }