in sample-kinesis-producer/src/main/java/com/optimize/downstream/samplekinesisproducer/IOTDeviceConsumerFromBlockingQueueToKinesisStreams.java [36:61]
public void run()
{
long threadId = Thread.currentThread().getId();
System.out.println("Thread # " + threadId + " is doing this task");
while(!inputQueue.isEmpty())
{
try {
IOTDevice deviceMessage = inputQueue.take();
String partitionKey = deviceMessage.getDeviceId();
String json = gson.toJson(deviceMessage);
//System.out.println("================= JSON String IS ================");
//System.out.println(json);
//System.out.println("Partition Key / Device Id before inserting into Kinesis stream is : " + partitionKey);
//System.out.println("SRIKANTH : SIZE IS : " + json.getBytes(charset).length);
//ByteBuffer data = ByteBuffer.wrap(SerializationUtils.serialize(deviceMessage));
ByteBuffer data = ByteBuffer.wrap(json.getBytes());
pushToKinesis(new PutRecordsRequestEntry().withPartitionKey(partitionKey).withData(data));
} catch (Exception e) {
e.printStackTrace();
}
}
System.exit(0);
}