public void putRecord()

in kinesis-sink/src/main/java/com/amazonaws/hbase/datasink/KinesisDataSinkImpl.java [146:182]


	public void putRecord(ByteBuffer buffer, String tablename,String partition) throws IOException, InterruptedException, ExecutionException {
		if (kinesis == null) { // creating the producer when there is a request.
			KinesisProducerConfiguration config = configUtil.getKPLConfiguration();
			LOG.debug("First Time producer. endpoint " + config.getKinesisEndpoint() + " port: " + config.getKinesisPort() );
			this.kinesis = KinesisProducerFactory.getProducer(config);
		}
	
		md.update(partition.getBytes());
		String digest = Base64.getEncoder().encodeToString(md.digest());
		String destination = this.getConfigurationUtil().getStreamNameFromTableName(tablename);

			
		long time = System.currentTimeMillis();
		
		if (configUtil.isSynchPutsEnabled()) {
			Future<UserRecordResult> putFuture = (Future<UserRecordResult>) kinesis.addUserRecord(destination, digest, buffer);
			UserRecordResult result = putFuture.get(); // this does block     
		
			if (result.isSuccessful()) {
				if (LOG.isDebugEnabled()) {
					LOG.debug(
							"Put record into shard= {} PartitionKey = {}, time={} 	"
							, result.getShardId()
							, digest
							, System.currentTimeMillis() - time); 
				}	
			} else {
				for (Attempt attempt : result.getAttempts()) {
					LOG.error(attempt.getErrorMessage());
					throw new IOException("Record faild to replicate");
				}
			}	
		} else {
			ListenableFuture<UserRecordResult> putFuture =  kinesis.addUserRecord(destination, digest, buffer);
			Futures.addCallback(putFuture,putRecordCallback, executor);
		}
	}