public static void main()

in java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/MetricsAwareSampleProducer.java [62:193]


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        final long totalRecordsToPut = 50000;
        final int dataSize = 64;
        final long outstandingLimit = 5000;
        
        final AtomicLong sequenceNumber = new AtomicLong(0);
        final AtomicLong completed = new AtomicLong(0);
        final String timetstamp = Long.toString(System.currentTimeMillis());
        
        KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                .setRecordMaxBufferedTime(3000)
                .setMaxConnections(1)
                .setRequestTimeout(60000)
                .setRegion(SampleProducerConfig.REGION_DEFAULT);
        
        final KinesisProducer kinesisProducer = new KinesisProducer(config);
        
        // Result handler
        final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
            @Override
            public void onFailure(Throwable t) {
                if (t instanceof UserRecordFailedException) {
                    Attempt last = Iterables.getLast(
                            ((UserRecordFailedException) t).getResult().getAttempts());
                    log.error(String.format(
                            "Record failed to put - %s : %s",
                            last.getErrorCode(), last.getErrorMessage()));
                }
                log.error("Exception during put", t);
                System.exit(1);
            }

            @Override
            public void onSuccess(UserRecordResult result) {
                completed.getAndIncrement();
            }
        };
        
        // Progress updates
        Thread progress = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    long put = sequenceNumber.get();
                    double putPercent = 100.0 * put / totalRecordsToPut;
                    long done = completed.get();
                    double donePercent = 100.0 * done / totalRecordsToPut;
                    log.info(String.format(
                            "Put %d of %d so far (%.2f %%), %d have completed (%.2f %%)",
                            put, totalRecordsToPut, putPercent, done, donePercent));
                    
                    if (done == totalRecordsToPut) {
                        break;
                    }

                    // Numerous metrics are available from the KPL locally, as
                    // well as uploaded to CloudWatch. See the metrics
                    // documentation for details.
                    //
                    // KinesisProducer provides methods to retrieve metrics for
                    // the current instance, with a customizable time window.
                    // This allows us to get sliding window statistics in real
                    // time for the current host.
                    //
                    // Here we're going to look at the number of user records
                    // put over a 5 seconds sliding window.
                    try {
                        for (Metric m : kinesisProducer.getMetrics("UserRecordsPut", 5)) {
                            // Metrics are emitted at different granularities, here
                            // we only look at the stream level metric, which has a
                            // single dimension of stream name.
                            if (m.getDimensions().size() == 1 && m.getSampleCount() > 0) {
                                log.info(String.format(
                                        "(Sliding 5 seconds) Avg put rate: %.2f per sec, success rate: %.2f, failure rate: %.2f, total attemped: %d",
                                        m.getSum() / 5,
                                        m.getSum() / m.getSampleCount() * 100,
                                        (m.getSampleCount() - m.getSum()) / m.getSampleCount() * 100,
                                        (long) m.getSampleCount()));
                            }
                        }
                    } catch (Exception e) {
                        log.error("Unexpected error getting metrics", e);
                        System.exit(1);
                    }
                    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {}
                }
            }
        });
        progress.start();
        
        // Put records
        while (true) {
            // We're going to put as fast as we can until we've reached the max
            // number of records outstanding.
            if (sequenceNumber.get() < totalRecordsToPut) {
                if (kinesisProducer.getOutstandingRecordsCount() < outstandingLimit) {
                    ByteBuffer data = Utils.generateData(sequenceNumber.incrementAndGet(), dataSize);
                    ListenableFuture<UserRecordResult> f = kinesisProducer.addUserRecord(SampleProducerConfig.STREAM_NAME_DEFAULT,
                            timetstamp, data);
                    Futures.addCallback(f, callback, Executors.newSingleThreadExecutor());
                } else {
                    Thread.sleep(1);
                }
            } else {
                break;
            }
        }
        
        // Wait for remaining records to finish
        while (kinesisProducer.getOutstandingRecordsCount() > 0) {
            kinesisProducer.flush();
            Thread.sleep(100);
        }
        
        progress.join();
        
        for (Metric m : kinesisProducer.getMetrics("UserRecordsPerKinesisRecord")) {
            if (m.getDimensions().containsKey("ShardId")) {
                log.info(String.format(
                        "%.2f user records were aggregated into each Kinesis record on average for shard %s, for a total of %d Kinesis records.",
                        m.getMean(),
                        m.getDimensions().get("ShardId"),
                        (long) m.getSampleCount()));
            }
        }
        
        kinesisProducer.destroy();
        log.info("Finished.");
    }