in java/KinesisLambdaForwarder/src/main/java/com/amazonaws/kinesis/forwarder/LambdaAggregatingForwarder.java [117:151]
public Void handleRequest(KinesisEvent input, Context context)
{
LambdaLogger logger = context.getLogger();
logger.log("Received " + input.getRecords().size() + " raw Kinesis records.");
try
{
//Allows us to receive and process Kinesis aggregated records, but can also process normal
//non-aggregated records without an issue (deaggregation is a no-op in the latter scenario)
List<UserRecord> userRecords = RecordDeaggregator.deaggregate(input.getRecords());
logger.log("Received " + userRecords.size() + " deaggregated Kinesis records.");
for (UserRecord userRecord : userRecords)
{
try
{
AggRecord aggRecord = this.aggregator.addUserRecord(userRecord);
checkAndForwardRecords(logger, aggRecord);
}
catch(Exception e)
{
logger.log("[ERROR] Could not add user record: " + e.getMessage());
}
}
checkAndForwardRecords(logger, this.aggregator.clearAndGet());
}
catch (Exception e)
{
logger.log("Lambda function encountered fatal error: " + e.getMessage());
}
return null;
}