public void runOnce()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchPublisherRunnable.java [96:149]


    public void runOnce() {
        List<MetricDatumWithKey<CloudWatchMetricKey>> dataToPublish = null;
        synchronized (queue) {
            /*
             * We should send if:
             * 
             * it's been maxBufferTimeMillis since our last send
             * or if the queue contains > batchSize elements
             * or if we're shutting down
             */
            long timeSinceFlush = Math.max(0, getTime() - lastFlushTime);
            if (timeSinceFlush >= bufferTimeMillis || queue.size() >= flushSize || shuttingDown) {
                dataToPublish = queue.drain(flushSize);
                if (log.isDebugEnabled()) {
                    log.debug("Drained {} datums from queue", dataToPublish.size());
                }

                if (shuttingDown) {
                    if (log.isDebugEnabled()) {
                        log.debug("Shutting down with {} datums left on the queue", queue.size());
                    }

                    // If we're shutting down, we successfully shut down only when the queue is empty.
                    shutdown = queue.isEmpty();
                }
            } else {
                long waitTime = bufferTimeMillis - timeSinceFlush;
                if (log.isDebugEnabled()) {
                    log.debug("Waiting up to {} ms for {} more datums to appear.", waitTime, flushSize
                            - queue.size());
                }

                try {
                    // Wait for enqueues for up to maxBufferTimeMillis.
                    queue.wait(waitTime);
                } catch (InterruptedException e) {
                }
            }
        }

        if (dataToPublish != null) {
            try {
                metricsPublisher.publishMetrics(dataToPublish);
            } catch (Throwable t) {
                log.error("Caught exception thrown by metrics Publisher in CloudWatchPublisherRunnable", t);
            }
            // Changing the value of lastFlushTime will change the time when metrics are flushed next.
            lastFlushTime = getTime() + nextJitterValueToUse;
            if (maxJitter != 0) {
                // nextJittervalueToUse will be a value between (-maxJitter,+maxJitter)
                nextJitterValueToUse = maxJitter - rand.nextInt(2 * maxJitter);
            }
        }
    }