in kinesis-sink/src/main/java/com/amazonaws/hbase/datasink/KinesisDataSinkImpl.java [146:182]
public void putRecord(ByteBuffer buffer, String tablename,String partition) throws IOException, InterruptedException, ExecutionException {
if (kinesis == null) { // creating the producer when there is a request.
KinesisProducerConfiguration config = configUtil.getKPLConfiguration();
LOG.debug("First Time producer. endpoint " + config.getKinesisEndpoint() + " port: " + config.getKinesisPort() );
this.kinesis = KinesisProducerFactory.getProducer(config);
}
md.update(partition.getBytes());
String digest = Base64.getEncoder().encodeToString(md.digest());
String destination = this.getConfigurationUtil().getStreamNameFromTableName(tablename);
long time = System.currentTimeMillis();
if (configUtil.isSynchPutsEnabled()) {
Future<UserRecordResult> putFuture = (Future<UserRecordResult>) kinesis.addUserRecord(destination, digest, buffer);
UserRecordResult result = putFuture.get(); // this does block
if (result.isSuccessful()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Put record into shard= {} PartitionKey = {}, time={} "
, result.getShardId()
, digest
, System.currentTimeMillis() - time);
}
} else {
for (Attempt attempt : result.getAttempts()) {
LOG.error(attempt.getErrorMessage());
throw new IOException("Record faild to replicate");
}
}
} else {
ListenableFuture<UserRecordResult> putFuture = kinesis.addUserRecord(destination, digest, buffer);
Futures.addCallback(putFuture,putRecordCallback, executor);
}
}