void accumulate()

in src/main/java/software/amazon/event/kafkaconnector/batch/DefaultEventBridgeBatching.java [122:144]


    void accumulate(final MappedSinkRecord<PutEventsRequestEntry> item) {

      var itemSize = getSize(item.getValue());
      if (itemSize > MAX_BATCH_SIZE_BYTES) {
        var sinkRecord = item.getSinkRecord();
        logger.warn(
            "Item for SinkRecord with topic='{}', partition={} and offset={} exceeds EventBridge size limit. Size is {} bytes.",
            sinkRecord.topic(),
            sinkRecord.kafkaPartition(),
            sinkRecord.kafkaOffset(),
            itemSize);
      }
      var actualBatchItems = batches.get(index).size();
      if ((actualBatchSize + itemSize > MAX_BATCH_SIZE_BYTES) && (actualBatchItems > 0)
          || (actualBatchItems >= MAX_BATCH_ITEMS)) {
        batches.add(new ArrayList<>());
        index++;
        actualBatchSize = 0;
      }

      actualBatchSize += itemSize;
      batches.get(index).add(item);
    }