in lambda-kafka-function/src/main/java/com/amazonaws/hbase/lambda/KafkaHandler.java [78:116]
public String handleRequest(KafkaEvent event, Context context) {
logger = context.getLogger();
String response = new String("200 OK");
Map<String,List<KafkaEventRecord>> topicRecords = event.getRecords();
//logger.log("Record count: " + rec.size());
for (String topic : topicRecords.keySet()) {
for ( KafkaEventRecord record: topicRecords.get(topic)) {
try {
byte[] dataBytes;
if (compressionEnabled ==true) {
dataBytes = gzipDecompress(record.getValue().getBytes());
} else {
dataBytes = record.getValue().getBytes();
}
HBaseWALEntry entry = mapper.readValue(Base64.decode(dataBytes),HBaseWALEntry.class);
for (HBaseCell c : (List<HBaseCell>)entry.getWalEdit().getCells()) {
if (c.getType().toLowerCase().compareTo("put")==0) {
try {
float score = validator.getScore(c);
writeRecord(c.getRow(),c.getFamily(),"score".getBytes(), new Float(score).toString().getBytes());
logger.log(new String(c.getRow())+ " speed:" + new String(c.getValue()) + " score: "+ score );
} catch (Exception e) {
// We are ignoring invalid records.
logger.log("InvalidRecord: " + e.getMessage());
}
} else {
logger.log("InvalidRecord: " + new String (dataBytes));
}
}
} catch (IOException e) {
logger.log(e.getMessage() + " " + e.getStackTrace());
}
}
}
return response;
}