public String handleRequest()

in lambda-kafka-function/src/main/java/com/amazonaws/hbase/lambda/KafkaHandler.java [78:116]


	public String handleRequest(KafkaEvent event, Context context) {
		logger =  context.getLogger();
		String response = new String("200 OK");
		Map<String,List<KafkaEventRecord>>  topicRecords = event.getRecords();
		//logger.log("Record count: " + rec.size());
	
		for (String topic : topicRecords.keySet()) {
			for ( KafkaEventRecord record: topicRecords.get(topic)) {
				try {
					byte[] dataBytes;
					if (compressionEnabled ==true) {
					dataBytes = gzipDecompress(record.getValue().getBytes());
					}  else {
						dataBytes = record.getValue().getBytes();
					}
		
					HBaseWALEntry entry = mapper.readValue(Base64.decode(dataBytes),HBaseWALEntry.class);
					for (HBaseCell c : (List<HBaseCell>)entry.getWalEdit().getCells())  {
						if (c.getType().toLowerCase().compareTo("put")==0) {
							try {
								float score = validator.getScore(c);
								writeRecord(c.getRow(),c.getFamily(),"score".getBytes(), new Float(score).toString().getBytes());
								logger.log(new String(c.getRow())+ " speed:" + new String(c.getValue()) + " score: "+ score );
							
							} catch (Exception e) {
								// We are ignoring invalid records.
								logger.log("InvalidRecord: " + e.getMessage());
							}
						} else {
							logger.log("InvalidRecord: " + new String (dataBytes));
						}
					}	
				} catch (IOException e) {
					logger.log(e.getMessage() + " " + e.getStackTrace());
				}
			}
		}
		return response;
	}