in kinesis-lambda/src/main/java/com/optimize/downstream/lambda/ProcessKinesisRecords.java [145:167]
private String sendToFireHose(String mergedJsonString)
{
PutRecordResult res = null;
try {
//To Firehose -
System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
PutRecordRequest req = new PutRecordRequest()
.withDeliveryStreamName(firehoseStreamName);
// Without compression - Send to Firehose
//Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));
// With compression - send to Firehose
Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
req.setRecord(record);
res = kinesisFirehoseClient.putRecord(req);
}
catch (IOException ie) {
ie.printStackTrace();
}
return res.getRecordId();
}