public void onTimer()

in src/main/java/com/google/solutions/df/log/aggregations/common/DLPTransform.java [142:173]


    public void onTimer(
        @StateId("elementsBag") BagState<Table.Row> elementsBag,
        OutputReceiver<Iterable<Table.Row>> output) {
      AtomicInteger bufferSize = new AtomicInteger();

      List<Table.Row> rows = new ArrayList<>();
      elementsBag
          .read()
          .forEach(
              element -> {
                Integer elementSize = element.getSerializedSize();
                boolean clearBuffer = (bufferSize.intValue() + elementSize.intValue() > batchSize);
                if (clearBuffer) {
                  numberOfRowsBagged.inc(rows.size());
                  LOG.info("Clear Buffer {}", rows.size());
                  output.output(rows);
                  rows.clear();
                  bufferSize.set(0);
                  rows.add(element);
                  bufferSize.getAndAdd(Integer.valueOf(element.getSerializedSize()));

                } else {
                  rows.add(element);
                  bufferSize.getAndAdd(Integer.valueOf(element.getSerializedSize()));
                }
              });
      if (!rows.isEmpty()) {
        LOG.info("Remaining rows {}", rows.size());
        numberOfRowsBagged.inc(rows.size());
        output.output(rows);
      }
    }