public static void main()

in java/KinesisTestProducers/src/main/java/com/amazonaws/kinesis/producer/SampleKPLProducer.java [100:176]


    public static void main(String[] args) throws Exception
    {
        if (args.length != 2)
        {
            System.err.println("Usage SampleKPLProducer <stream name> <region>");
            System.exit(1);
        }

        String streamName = args[0];
        String regionName = args[1];

        final KinesisProducer producer = getKinesisProducer(regionName);

        final AtomicLong sequenceNumber = new AtomicLong(0);
        final AtomicLong completed = new AtomicLong(0);

        final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>()
        {
            @Override
            public void onFailure(Throwable t)
            {
                if (t instanceof UserRecordFailedException)
                {
                    Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
                    System.err.println(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage()));
                }
                System.err.println("Exception during put: " + t.getMessage());
                t.printStackTrace();
                System.exit(1);
            }

            @Override
            public void onSuccess(UserRecordResult result)
            {
                completed.getAndIncrement();
            }
        };

        final Runnable putOneRecord = new Runnable()
        {
            @Override
            public void run()
            {
                byte[] data = ProducerUtils.randomData(sequenceNumber.get(), ProducerConfig.RECORD_SIZE_BYTES);
                ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, ProducerUtils.randomPartitionKey(),
                        ProducerUtils.randomExplicitHashKey(), ByteBuffer.wrap(data));
                Futures.addCallback(f, callback);
            }
        };

        EXECUTOR.scheduleAtFixedRate(new Runnable()
        {
            @Override
            public void run()
            {
                long put = sequenceNumber.get();
                long total = RECORDS_PER_SECOND * SECONDS_TO_RUN;
                double putPercent = 100.0 * put / total;
                long done = completed.get();
                double donePercent = 100.0 * done / total;
                System.out.println(String.format("Put %d of %d so far (%.2f %%), %d have completed (%.2f %%)", put, total, putPercent, done, donePercent));
            }
        }, 1, 1, TimeUnit.SECONDS);

        System.out.println(String.format("Starting puts... will run for %d seconds at %d records per second", SECONDS_TO_RUN, RECORDS_PER_SECOND));

        executeAtTargetRate(EXECUTOR, putOneRecord, sequenceNumber, SECONDS_TO_RUN, RECORDS_PER_SECOND);

        EXECUTOR.awaitTermination(SECONDS_TO_RUN + 1, TimeUnit.SECONDS);

        System.out.println("Waiting for remaining puts to finish...");
        producer.flushSync();
        System.out.println("All records complete.");

        producer.destroy();
        System.out.println("Finished.");
    }