in java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleProducer.java [80:186]
public static void main(String[] args) throws Exception {
final SampleProducerConfig config = new SampleProducerConfig(args);
log.info(String.format("Stream name: %s Region: %s secondsToRun %d",config.getStreamName(), config.getRegion(),
config.getSecondsToRun()));
log.info(String.format("Will attempt to run the KPL at %f MB/s...",(config.getDataSize() * config
.getRecordsPerSecond())/(1000000.0)));
final KinesisProducer producer = new KinesisProducer(config.transformToKinesisProducerConfiguration());
// The monotonically increasing sequence number we will put in the data of each record
final AtomicLong sequenceNumber = new AtomicLong(0);
// The number of records that have finished (either successfully put, or failed)
final AtomicLong completed = new AtomicLong(0);
// KinesisProducer.addUserRecord is asynchronous. A callback can be used to receive the results.
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
// If we see any failures, we will log them.
if (t instanceof UserRecordFailedException) {
int attempts = ((UserRecordFailedException) t).getResult().getAttempts().size()-1;
Attempt last = ((UserRecordFailedException) t).getResult().getAttempts().get(attempts);
if(attempts > 1) {
Attempt previous = ((UserRecordFailedException) t).getResult().getAttempts().get(attempts - 1);
log.error(String.format(
"Record failed to put - %s : %s. Previous failure - %s : %s",
last.getErrorCode(), last.getErrorMessage(), previous.getErrorCode(), previous.getErrorMessage()));
}else{
log.error(String.format(
"Record failed to put - %s : %s.",
last.getErrorCode(), last.getErrorMessage()));
}
} else if (t instanceof UnexpectedMessageException) {
log.error("Record failed to put due to unexpected message received from native layer",
t);
}
log.error("Exception during put", t);
}
@Override
public void onSuccess(UserRecordResult result) {
completed.getAndIncrement();
}
};
final ExecutorService callbackThreadPool = Executors.newCachedThreadPool();
// The lines within run() are the essence of the KPL API.
final Runnable putOneRecord = new Runnable() {
@Override
public void run() {
ByteBuffer data = Utils.generateData(sequenceNumber.get(), config.getDataSize());
// TIMESTAMP is our partition key
ListenableFuture<UserRecordResult> f =
producer.addUserRecord(config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data);
Futures.addCallback(f, callback, callbackThreadPool);
}
};
// This gives us progress updates
EXECUTOR.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long put = sequenceNumber.get();
long total = config.getRecordsPerSecond() * config.getSecondsToRun();
double putPercent = 100.0 * put / total;
long done = completed.get();
double donePercent = 100.0 * done / total;
log.info(String.format(
"Put %d of %d so far (%.2f %%), %d have completed (%.2f %%)",
put, total, putPercent, done, donePercent));
log.info(String.format("Oldest future as of now in millis is %s", producer.getOldestRecordTimeInMillis
()));
}
}, 1, 1, TimeUnit.SECONDS);
// Kick off the puts
log.info(String.format(
"Starting puts... will run for %d seconds at %d records per second", config.getSecondsToRun(),
config.getRecordsPerSecond()));
executeAtTargetRate(EXECUTOR, putOneRecord, sequenceNumber, config.getSecondsToRun(),
config.getRecordsPerSecond());
// Wait for puts to finish. After this statement returns, we have
// finished all calls to putRecord, but the records may still be
// in-flight. We will additionally wait for all records to actually
// finish later.
EXECUTOR.awaitTermination(config.getSecondsToRun() + 1, TimeUnit.SECONDS);
// If you need to shutdown your application, call flushSync() first to
// send any buffered records. This method will block until all records
// have finished (either success or fail). There are also asynchronous
// flush methods available.
//
// Records are also automatically flushed by the KPL after a while based
// on the time limit set with Configuration.setRecordMaxBufferedTime()
log.info("Waiting for remaining puts to finish...");
producer.flushSync();
log.info("All records complete.");
// This kills the child process and shuts down the threads managing it.
producer.destroy();
log.info("Finished.");
}