in java/KinesisTestProducers/src/main/java/com/amazonaws/kinesis/producer/SampleKPLProducer.java [100:176]
public static void main(String[] args) throws Exception
{
if (args.length != 2)
{
System.err.println("Usage SampleKPLProducer <stream name> <region>");
System.exit(1);
}
String streamName = args[0];
String regionName = args[1];
final KinesisProducer producer = getKinesisProducer(regionName);
final AtomicLong sequenceNumber = new AtomicLong(0);
final AtomicLong completed = new AtomicLong(0);
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>()
{
@Override
public void onFailure(Throwable t)
{
if (t instanceof UserRecordFailedException)
{
Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
System.err.println(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage()));
}
System.err.println("Exception during put: " + t.getMessage());
t.printStackTrace();
System.exit(1);
}
@Override
public void onSuccess(UserRecordResult result)
{
completed.getAndIncrement();
}
};
final Runnable putOneRecord = new Runnable()
{
@Override
public void run()
{
byte[] data = ProducerUtils.randomData(sequenceNumber.get(), ProducerConfig.RECORD_SIZE_BYTES);
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, ProducerUtils.randomPartitionKey(),
ProducerUtils.randomExplicitHashKey(), ByteBuffer.wrap(data));
Futures.addCallback(f, callback);
}
};
EXECUTOR.scheduleAtFixedRate(new Runnable()
{
@Override
public void run()
{
long put = sequenceNumber.get();
long total = RECORDS_PER_SECOND * SECONDS_TO_RUN;
double putPercent = 100.0 * put / total;
long done = completed.get();
double donePercent = 100.0 * done / total;
System.out.println(String.format("Put %d of %d so far (%.2f %%), %d have completed (%.2f %%)", put, total, putPercent, done, donePercent));
}
}, 1, 1, TimeUnit.SECONDS);
System.out.println(String.format("Starting puts... will run for %d seconds at %d records per second", SECONDS_TO_RUN, RECORDS_PER_SECOND));
executeAtTargetRate(EXECUTOR, putOneRecord, sequenceNumber, SECONDS_TO_RUN, RECORDS_PER_SECOND);
EXECUTOR.awaitTermination(SECONDS_TO_RUN + 1, TimeUnit.SECONDS);
System.out.println("Waiting for remaining puts to finish...");
producer.flushSync();
System.out.println("All records complete.");
producer.destroy();
System.out.println("Finished.");
}