in src/main/java/com/google/solutions/df/log/aggregations/common/DLPTransform.java [142:173]
public void onTimer(
@StateId("elementsBag") BagState<Table.Row> elementsBag,
OutputReceiver<Iterable<Table.Row>> output) {
AtomicInteger bufferSize = new AtomicInteger();
List<Table.Row> rows = new ArrayList<>();
elementsBag
.read()
.forEach(
element -> {
Integer elementSize = element.getSerializedSize();
boolean clearBuffer = (bufferSize.intValue() + elementSize.intValue() > batchSize);
if (clearBuffer) {
numberOfRowsBagged.inc(rows.size());
LOG.info("Clear Buffer {}", rows.size());
output.output(rows);
rows.clear();
bufferSize.set(0);
rows.add(element);
bufferSize.getAndAdd(Integer.valueOf(element.getSerializedSize()));
} else {
rows.add(element);
bufferSize.getAndAdd(Integer.valueOf(element.getSerializedSize()));
}
});
if (!rows.isEmpty()) {
LOG.info("Remaining rows {}", rows.size());
numberOfRowsBagged.inc(rows.size());
output.output(rows);
}
}