public String handleRequest()

in lambda-kinesis-function/src/main/java/com/amazonaws/hbase/lambda/KinesisHandler.java [78:113]


	public String handleRequest(KinesisEvent event, Context context) {
		logger =  context.getLogger();
		String response = new String("200 OK");
		List<KinesisEventRecord>  records = event.getRecords();
		//logger.log("Record count: " + records.size());
		for (KinesisEventRecord record : records) {
			try {
				byte[] dataBytes;
				if (compressionEnabled) {
					dataBytes = gzipDecompress(record.getKinesis().getData().array());
				}  else {
					dataBytes = record.getKinesis().getData().array();
				}
				HBaseWALEntry entry = mapper.readValue(dataBytes,HBaseWALEntry.class);
				for (HBaseCell c : (List<HBaseCell>)entry.getWalEdit().getCells())  {
					if (c.getType().toLowerCase().compareTo("put")==0) {
						try {
							float score = validator.getScore(c);
							writeRecord(c.getRow(),DEFAULT_HBASE_ENRICH_CULUMNFAMILY.getBytes(),"score".getBytes(), new Float(score).toString().getBytes());
							logger.log(new String(c.getRow())+ " speed:" + new String(c.getValue()) + " score: "+ score );
							
						} catch (Exception e) {
							// We are ignoring invalid records.
							logger.log("InvalidRecord: " + e.getMessage());
						}
					} else {
						logger.log("InvalidRecord: " + new String (dataBytes));
					}
				}
			} catch (IOException e) {
				// TODO Auto-generated catch block
				logger.log(e.getMessage() + " " + e.getStackTrace());
			}
		}
		return response;
	}