public KinesisDataSinkImpl()

in kinesis-sink/src/main/java/com/amazonaws/hbase/datasink/KinesisDataSinkImpl.java [60:109]


	public KinesisDataSinkImpl(Configuration config)  {
		super(config);
		this.configUtil = this.getConfigurationUtil();
		
		try {
			md = MessageDigest.getInstance("MD5");
		} catch (NoSuchAlgorithmException e) {
			//This should never happen as of java 7.
			LOG.error(" Every implementation of the Java platform is required to support the following standard MessageDigest algorithms:\n"
					+ "\n"
					+ "    MD5\n"
					+ "    SHA-1\n"
					+ "    SHA-256\n"
					+ "", e);
			e.printStackTrace();
		}
		if ( this.configUtil.isSynchPutsEnabled() == false) {
				LOG.info("Initilizing Asynchronous putRecords. We wil skip  failed PutRecords will be lost! ");
				putRecordCallback = new FutureCallback<UserRecordResult>() {     
					@Override 
					public void onFailure(Throwable t) {
						/* Analyze and respond to the failure  */
					};     
					@Override 
					public void onSuccess(UserRecordResult result) { 
						/* Respond to the success */ 
					};
			};
		    // callback threadpool
			executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));
		}
		
		Thread statPrinter = new Thread(){
		
		    public void run(){
		    	try {
		    		LOG.info("KPL pending records {}, OldestRecord: {}", 
		    				kinesis.getOutstandingRecordsCount(),
		    				kinesis.getOldestRecordTimeInMillis()
		    				);
					Thread.sleep(60 * 1000L);
				} catch (InterruptedException e) {
					LOG.info("KinesisSink preiodic stat printer died.");
					e.printStackTrace();
				}
		    }
		};

		statPrinter.start();
	}