in sample-kinesis-producer/src/main/java/com/optimize/downstream/samplekinesisproducer/IOTDeviceConsumerFromBlockingQueueToKinesisStreams.java [70:96]
private void pushToKinesis(PutRecordsRequestEntry entry)
{
/*System.out.println("===================================================================");
System.out.println("Data Size is : " + dataSize);
System.out.println("Remaining Data is : " + entry.getData().remaining());
System.out.println("Partition Key length is : " + entry.getPartitionKey().length());*/
int newDataSize = dataSize + entry.getData().remaining() +
entry.getPartitionKey().length();
if (newDataSize <= 5 * 1024 * 1024 && entries.size() < 500)
{
dataSize = newDataSize;
entries.add(entry);
//System.out.println("Data size is : " + dataSize );
}
else {
//System.out.println("In Else : Entries size is : " + entries.size() + " --- New Data size is ::: " + newDataSize);
//System.out.println("Sending records to Kinesis Stream... Size is ::: " + dataSize);
/*kinesis.putRecords(new PutRecordsRequest()
.withStreamName(STREAM_NAME)
.withRecords(entry));*/
flush();
System.out.println("Record sent to Kinesis Stream. Record size is ::: " + dataSize + " KB");
dataSize = 0;
pushToKinesis(entry);
}
}