in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/PredictTransform.java [184:211]
public void onTimer(
@StateId("elementsBag") BagState<String> elementsBag, OutputReceiver<String> output) {
AtomicInteger bufferSize = new AtomicInteger();
List<String> rows = new ArrayList<>();
elementsBag
.read()
.forEach(
element -> {
Integer elementSize = element.getBytes().length;
boolean clearBuffer = (bufferSize.intValue() + elementSize.intValue() > batchSize);
if (clearBuffer) {
LOG.debug("Clearing Rows {}", rows.size());
if (rows.size() > 0) output.output(emitResult(rows, rows.size()));
rows.clear();
bufferSize.set(0);
rows.add(element);
bufferSize.getAndAdd(Integer.valueOf(element.getBytes().length));
} else {
rows.add(element);
bufferSize.getAndAdd(Integer.valueOf(element.getBytes().length));
}
});
if (!rows.isEmpty() && rows.size() > 0) {
LOG.debug("Remaning Rows {}", rows.size());
output.output(emitResult(rows, rows.size()));
}
}