in src/main/java/software/amazon/event/kafkaconnector/batch/DefaultEventBridgeBatching.java [122:144]
void accumulate(final MappedSinkRecord<PutEventsRequestEntry> item) {
var itemSize = getSize(item.getValue());
if (itemSize > MAX_BATCH_SIZE_BYTES) {
var sinkRecord = item.getSinkRecord();
logger.warn(
"Item for SinkRecord with topic='{}', partition={} and offset={} exceeds EventBridge size limit. Size is {} bytes.",
sinkRecord.topic(),
sinkRecord.kafkaPartition(),
sinkRecord.kafkaOffset(),
itemSize);
}
var actualBatchItems = batches.get(index).size();
if ((actualBatchSize + itemSize > MAX_BATCH_SIZE_BYTES) && (actualBatchItems > 0)
|| (actualBatchItems >= MAX_BATCH_ITEMS)) {
batches.add(new ArrayList<>());
index++;
actualBatchSize = 0;
}
actualBatchSize += itemSize;
batches.get(index).add(item);
}