in src/main/java/com/google/solutions/df/log/aggregations/common/MergeLogAggrMap.java [28:51]
public Row apply(Row input) {
Row aggrRow =
Row.withSchema(Util.bqLogSchema)
.addValues(
input.getRow("key").getString("subscriberId"),
input.getRow("key").getString("dstSubnet"),
Util.getTimeStamp(),
input.getRow("value").getInt64("number_of_records").intValue(),
input.getRow("value").getInt64("number_of_unique_ips").intValue(),
input.getRow("value").getInt64("number_of_unique_ports").intValue(),
input.getRow("value").getInt32("max_tx_bytes"),
input.getRow("value").getInt32("min_tx_bytes"),
input.getRow("value").getDouble("avg_tx_bytes"),
input.getRow("value").getInt32("max_rx_bytes"),
input.getRow("value").getInt32("min_rx_bytes"),
input.getRow("value").getDouble("avg_rx_bytes"),
input.getRow("value").getInt32("max_duration"),
input.getRow("value").getInt32("min_duration"),
input.getRow("value").getDouble("avg_duration"))
.build();
LOG.debug("Aggr Row {}", aggrRow.toString());
return aggrRow;
}