in amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchPublisherRunnable.java [96:149]
public void runOnce() {
List<MetricDatumWithKey<CloudWatchMetricKey>> dataToPublish = null;
synchronized (queue) {
/*
* We should send if:
*
* it's been maxBufferTimeMillis since our last send
* or if the queue contains > batchSize elements
* or if we're shutting down
*/
long timeSinceFlush = Math.max(0, getTime() - lastFlushTime);
if (timeSinceFlush >= bufferTimeMillis || queue.size() >= flushSize || shuttingDown) {
dataToPublish = queue.drain(flushSize);
if (log.isDebugEnabled()) {
log.debug("Drained {} datums from queue", dataToPublish.size());
}
if (shuttingDown) {
if (log.isDebugEnabled()) {
log.debug("Shutting down with {} datums left on the queue", queue.size());
}
// If we're shutting down, we successfully shut down only when the queue is empty.
shutdown = queue.isEmpty();
}
} else {
long waitTime = bufferTimeMillis - timeSinceFlush;
if (log.isDebugEnabled()) {
log.debug("Waiting up to {} ms for {} more datums to appear.", waitTime, flushSize
- queue.size());
}
try {
// Wait for enqueues for up to maxBufferTimeMillis.
queue.wait(waitTime);
} catch (InterruptedException e) {
}
}
}
if (dataToPublish != null) {
try {
metricsPublisher.publishMetrics(dataToPublish);
} catch (Throwable t) {
log.error("Caught exception thrown by metrics Publisher in CloudWatchPublisherRunnable", t);
}
// Changing the value of lastFlushTime will change the time when metrics are flushed next.
lastFlushTime = getTime() + nextJitterValueToUse;
if (maxJitter != 0) {
// nextJittervalueToUse will be a value between (-maxJitter,+maxJitter)
nextJitterValueToUse = maxJitter - rand.nextInt(2 * maxJitter);
}
}
}