in kinesis-lambda/src/main/java/com/optimize/downstream/lambda/ProcessKinesisRecords.java [38:77]
public Void handleRequest(KinesisEvent event, Context context)
{
//System.out.print("In Handle Request");
gson = new Gson();
ClientConfiguration config = new ClientConfiguration();
config.setMaxErrorRetry(5);
config.setSocketTimeout(100);
kinesisFirehoseClient = new AmazonKinesisFirehoseClient(config);
kinesisFirehoseClient.setRegion(Region.getRegion(Regions.fromName(REGION)));
String mergedJsonString = "";
String recordId;
try {
for (KinesisEventRecord rec : event.getRecords())
{
//System.out.println(new String(rec.getKinesis().getData().array()));
String jsonMessage = new String(rec.getKinesis().getData().array());
//System.out.println("Kinesis JSON Message is ::: ");
//System.out.println(jsonMessage);
AdditionalIOTData additionalDeviceMessage = generateAdditionalIOTDeviceData();
String addJson = gson.toJson(additionalDeviceMessage);
//System.out.println("Additional JSON Is :: " + addJson);
try {
mergedJsonString = mergeJsonStrings(jsonMessage, addJson);
System.out.println(mergedJsonString);
System.out.println("Sending record to Firehose");
recordId = sendToFireHose(mergedJsonString);
System.out.println("Record sent to Firehose. Result Record Id is : " + recordId);
}catch (Exception e)
{
e.printStackTrace();
}
}
}
catch (Exception ie)
{
ie.getStackTrace();
}
return null;
}