public void runOnce()

in src/com/amazon/kinesis/streaming/agent/metrics/CWPublisherRunnable.java [101:151]


    public void runOnce() {
        List<MetricDatumWithKey<KeyType>> dataToPublish = null;
        synchronized (queue) {
            /*
             * We should send if:
             *
             * it's been maxBufferTimeMillis since our last send
             * or if the queue contains > batchSize elements
             * or if we're shutting down
             */
            long timeSinceFlush = Math.max(0, getTime() - lastFlushTime);
            if (timeSinceFlush >= bufferTimeMillis || queue.size() >= flushSize || shuttingDown) {
                dataToPublish = queue.drain(flushSize);
                LOG.debug("Drained {} datums from queue", dataToPublish.size());

                if (shuttingDown) {
                    LOG.debug("Shutting down with {} datums left on the queue", queue.size());

                    // If we're shutting down, we successfully shut down only when the queue is empty.
                    shutdown = queue.isEmpty();
                }
            } else {
                long waitTime = bufferTimeMillis - timeSinceFlush;
                LOG.debug("Waiting up to {}ms for {} more datums to appear.", waitTime, flushSize
                        - queue.size());

                try {
                    // Wait for enqueues for up to maxBufferTimeMillis.
                    queue.wait(waitTime);
                } catch (InterruptedException e) {
                }
            }
        }

        if (dataToPublish != null) {
            try {
                metricsPublisher.publishMetrics(dataToPublish);
            } catch (Throwable t) {
                LOG.error("Caught exception thrown by metrics Publisher in CWPublisherRunnable", t);
                if (t instanceof LinkageError) {
                    throw t;
                }
            }
            // Changing the value of lastFlushTime will change the time when metrics are flushed next.
            lastFlushTime = getTime() + nextJitterValueToUse;
            if (maxJitter != 0) {
                // nextJittervalueToUse will be a value between (-maxJitter,+maxJitter)
                nextJitterValueToUse = maxJitter - rand.nextInt(2 * maxJitter);
            }
        }
    }