in src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java [170:192]
private void putRecordsInBatch(Collection<SinkRecord> sinkRecords) {
List<Record> recordList = new ArrayList<Record>();
int recordsInBatch = 0;
int recordsSizeInBytes = 0;
for (SinkRecord sinkRecord : sinkRecords) {
Record record = DataUtility.createRecord(sinkRecord);
recordList.add(record);
recordsInBatch++;
recordsSizeInBytes += record.getData().capacity();
if (recordsInBatch == batchSize || recordsSizeInBytes > batchSizeInBytes) {
putRecordBatch(recordList);
recordList.clear();
recordsInBatch = 0;
recordsSizeInBytes = 0;
}
}
if (recordsInBatch > 0) {
putRecordBatch(recordList);
}
}