in lambda-kinesis-function/src/main/java/com/amazonaws/hbase/lambda/KinesisHandler.java [78:113]
public String handleRequest(KinesisEvent event, Context context) {
logger = context.getLogger();
String response = new String("200 OK");
List<KinesisEventRecord> records = event.getRecords();
//logger.log("Record count: " + records.size());
for (KinesisEventRecord record : records) {
try {
byte[] dataBytes;
if (compressionEnabled) {
dataBytes = gzipDecompress(record.getKinesis().getData().array());
} else {
dataBytes = record.getKinesis().getData().array();
}
HBaseWALEntry entry = mapper.readValue(dataBytes,HBaseWALEntry.class);
for (HBaseCell c : (List<HBaseCell>)entry.getWalEdit().getCells()) {
if (c.getType().toLowerCase().compareTo("put")==0) {
try {
float score = validator.getScore(c);
writeRecord(c.getRow(),DEFAULT_HBASE_ENRICH_CULUMNFAMILY.getBytes(),"score".getBytes(), new Float(score).toString().getBytes());
logger.log(new String(c.getRow())+ " speed:" + new String(c.getValue()) + " score: "+ score );
} catch (Exception e) {
// We are ignoring invalid records.
logger.log("InvalidRecord: " + e.getMessage());
}
} else {
logger.log("InvalidRecord: " + new String (dataBytes));
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
logger.log(e.getMessage() + " " + e.getStackTrace());
}
}
return response;
}