lambda-kafka-function/src/main/java/com/amazonaws/hbase/lambda/KafkaHandler.java [100:172]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
								logger.log(new String(c.getRow())+ " speed:" + new String(c.getValue()) + " score: "+ score );
							
							} catch (Exception e) {
								// We are ignoring invalid records.
								logger.log("InvalidRecord: " + e.getMessage());
							}
						} else {
							logger.log("InvalidRecord: " + new String (dataBytes));
						}
					}	
				} catch (IOException e) {
					logger.log(e.getMessage() + " " + e.getStackTrace());
				}
			}
		}
		return response;
	}
	
	/**
	 * Decompress byte from Gzip
	 * 
	 * @param bytes	bytes to decompress
	 * @return 	returns bytes
	 * @throws IOException
	 */
	private byte[] gzipDecompress(byte[] bytes) throws IOException {
		ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
		GZIPInputStream zipStream = new GZIPInputStream(byteStream);
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		
		byte[] buffer = new byte[1024];
		int len;
		while ((len = zipStream.read(buffer)) > 0) {
		    out.write(buffer, 0, len);
		}
		zipStream.close();
		out.close();
		
		return out.toByteArray();
	}
	
	public void writeRecord(byte[] rowKey,byte[] cFamily, byte[] qualifier, byte[] value ) throws Exception {
			Put p = new Put(rowKey);
			p.addColumn(cFamily, qualifier, value);
			table.put(p);
	}
	
	private String getMasterIntanceGroupId(AmazonElasticMapReduce emr, String clusterid) {
		ListInstanceGroupsRequest listInstanceGroupsRequest = new ListInstanceGroupsRequest();
		listInstanceGroupsRequest.setClusterId(clusterid);
		List<InstanceGroup> instanceGroups = emr.listInstanceGroups(listInstanceGroupsRequest).getInstanceGroups();
		for (InstanceGroup instanceGroup: instanceGroups) {
			if (instanceGroup.getInstanceGroupType().contentEquals(InstanceGroupType.MASTER.toString())) {
				return instanceGroup.getId();
			}
		}
		return null;
	}
	
	public String getMasterInstanceIpAddress(String clusterid, String region) {
		
		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
				.withRegion(Regions.fromName(region))
				.build();
		
		String masterInstanceGroupId = getMasterIntanceGroupId(emr,clusterid);
		ListInstancesRequest listInstancesRequest = new ListInstancesRequest();
		listInstancesRequest.setInstanceGroupId(masterInstanceGroupId);
		listInstancesRequest.setClusterId(clusterid);
		
		List<Instance> instances = emr.listInstances(listInstancesRequest).getInstances();
		return instances.get(0).getPrivateDnsName();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



lambda-kinesis-function/src/main/java/com/amazonaws/hbase/lambda/KinesisHandler.java [97:168]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
							logger.log(new String(c.getRow())+ " speed:" + new String(c.getValue()) + " score: "+ score );
							
						} catch (Exception e) {
							// We are ignoring invalid records.
							logger.log("InvalidRecord: " + e.getMessage());
						}
					} else {
						logger.log("InvalidRecord: " + new String (dataBytes));
					}
				}
			} catch (IOException e) {
				// TODO Auto-generated catch block
				logger.log(e.getMessage() + " " + e.getStackTrace());
			}
		}
		return response;
	}
	
	/**
	 * Decompress byte from Gzip
	 * 
	 * @param bytes	bytes to decompress
	 * @return 	returns bytes
	 * @throws IOException
	 */
	private byte[] gzipDecompress(byte[] bytes) throws IOException {
		ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
		GZIPInputStream zipStream = new GZIPInputStream(byteStream);
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		
		byte[] buffer = new byte[1024];
		int len;
		while ((len = zipStream.read(buffer)) > 0) {
		    out.write(buffer, 0, len);
		}
		zipStream.close();
		out.close();
		
		return out.toByteArray();
	}
	
	public void writeRecord(byte[] rowKey,byte[] cFamily, byte[] qualifier, byte[] value ) throws Exception {
			Put p = new Put(rowKey);
			p.addColumn(cFamily, qualifier, value);
			table.put(p);
	}
	
	private String getMasterIntanceGroupId(AmazonElasticMapReduce emr, String clusterid) {
		ListInstanceGroupsRequest listInstanceGroupsRequest = new ListInstanceGroupsRequest();
		listInstanceGroupsRequest.setClusterId(clusterid);
		List<InstanceGroup> instanceGroups = emr.listInstanceGroups(listInstanceGroupsRequest).getInstanceGroups();
		for (InstanceGroup instanceGroup: instanceGroups) {
			if (instanceGroup.getInstanceGroupType().contentEquals(InstanceGroupType.MASTER.toString())) {
				return instanceGroup.getId();
			}
		}
		return null;
	}
	
	public String getMasterInstanceIpAddress(String clusterid, String region) {
		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
				.withRegion(Regions.fromName(region))
				.build();
		
		String masterInstanceGroupId = getMasterIntanceGroupId(emr,clusterid);
		ListInstancesRequest listInstancesRequest = new ListInstancesRequest();
		listInstancesRequest.setInstanceGroupId(masterInstanceGroupId);
		listInstancesRequest.setClusterId(clusterid);
		
		List<Instance> instances = emr.listInstances(listInstancesRequest).getInstances();
		return instances.get(0).getPrivateDnsName();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



