private void pushToKinesis()

in sample-kinesis-producer/src/main/java/com/optimize/downstream/samplekinesisproducer/IOTDeviceConsumerFromBlockingQueueToKinesisStreams.java [70:96]


    private void pushToKinesis(PutRecordsRequestEntry entry)
    {
        /*System.out.println("===================================================================");
        System.out.println("Data Size is : " + dataSize);
        System.out.println("Remaining Data is : " + entry.getData().remaining());
        System.out.println("Partition Key length is : " + entry.getPartitionKey().length());*/

        int newDataSize = dataSize + entry.getData().remaining() +
                entry.getPartitionKey().length();
        if (newDataSize <= 5 * 1024 * 1024 && entries.size() < 500)
        {
            dataSize = newDataSize;
            entries.add(entry);
            //System.out.println("Data size is : " + dataSize );
        }
        else {
            //System.out.println("In Else : Entries size is : " + entries.size() + " --- New Data size is ::: " + newDataSize);
            //System.out.println("Sending records to Kinesis Stream... Size is ::: " + dataSize);
            /*kinesis.putRecords(new PutRecordsRequest()
                .withStreamName(STREAM_NAME)
                .withRecords(entry));*/
            flush();
            System.out.println("Record sent to Kinesis Stream. Record size is ::: " + dataSize + " KB");
            dataSize = 0;
            pushToKinesis(entry);
        }
    }