in kinesis-sink/src/main/java/com/amazonaws/hbase/datasink/KinesisDataSinkImpl.java [60:109]
public KinesisDataSinkImpl(Configuration config) {
super(config);
this.configUtil = this.getConfigurationUtil();
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
//This should never happen as of java 7.
LOG.error(" Every implementation of the Java platform is required to support the following standard MessageDigest algorithms:\n"
+ "\n"
+ " MD5\n"
+ " SHA-1\n"
+ " SHA-256\n"
+ "", e);
e.printStackTrace();
}
if ( this.configUtil.isSynchPutsEnabled() == false) {
LOG.info("Initilizing Asynchronous putRecords. We wil skip failed PutRecords will be lost! ");
putRecordCallback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
/* Analyze and respond to the failure */
};
@Override
public void onSuccess(UserRecordResult result) {
/* Respond to the success */
};
};
// callback threadpool
executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));
}
Thread statPrinter = new Thread(){
public void run(){
try {
LOG.info("KPL pending records {}, OldestRecord: {}",
kinesis.getOutstandingRecordsCount(),
kinesis.getOldestRecordTimeInMillis()
);
Thread.sleep(60 * 1000L);
} catch (InterruptedException e) {
LOG.info("KinesisSink preiodic stat printer died.");
e.printStackTrace();
}
}
};
statPrinter.start();
}