in src/main/java/com/amazonaws/services/kinesis/scaling/auto/StreamMonitor.java [377:458]
public void run() {
LOG.info(String.format("Started Stream Monitor for %s", config.getStreamName()));
DateTime lastShardCapacityRefreshTime = new DateTime(System.currentTimeMillis());
// create a StreamMetricManager object
StreamMetricManager metricManager = new StreamMetricManager(this.config.getStreamName(),
this.config.getScaleOnOperations(), this.cloudWatchClient, this.kinesisClient);
LOG.info(String.format("Using Stream Scaler Version %s", StreamScaler.version));
try {
// load the current configured max capacity
metricManager.loadMaxCapacity();
// configure the duration to request from cloudwatch
int cwSampleDuration = Math.max(config.getScaleUp().getScaleAfterMins(),
config.getScaleDown().getScaleAfterMins());
ScalingOperationReport report = null;
do {
DateTime now = new DateTime(System.currentTimeMillis());
// fetch only the last N minutes metrics
DateTime metricStartTime = now.minusMinutes(cwSampleDuration);
// load the current cloudwatch metrics for the stream via the
// metrics manager
@SuppressWarnings("rawtypes")
Map currentUtilisationMetrics = metricManager.queryCurrentUtilisationMetrics(cwSampleDuration,
metricStartTime, now);
// process the aggregated set of Cloudwatch Datapoints
try {
report = processCloudwatchMetrics(currentUtilisationMetrics, metricManager.getStreamMaxCapacity(),
cwSampleDuration, now);
if (report != null) {
// refresh the current max capacity after the
// modification
metricManager.loadMaxCapacity();
lastShardCapacityRefreshTime = now;
// notify all report listeners that we've completed a
// scaling operation
if (this.config.getScalingOperationReportListener() != null) {
this.config.getScalingOperationReportListener().onReport(report);
}
if (report.getScaleDirection() != ScaleDirection.NONE) {
LOG.info(report.toString());
}
report = null;
}
} catch (Exception e) {
// keep running even though the scaling action had an exception
LOG.error(e.getMessage());
}
// refresh shard stats every configured period, in case someone
// has manually updated the number of shards manually
if (now.minusMinutes(this.config.getRefreshShardsNumberAfterMin())
.isAfter(lastShardCapacityRefreshTime)) {
metricManager.loadMaxCapacity();
lastShardCapacityRefreshTime = now;
}
try {
LOG.info(String.format("Next Check Cycle in %s seconds", this.config.getCheckInterval()));
Thread.sleep(this.config.getCheckInterval() * 1000);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
break;
}
} while (keepRunning);
LOG.info(String.format("Stream Monitor for %s in %s Completed. Exiting.", this.config.getStreamName(),
this.config.getRegion()));
} catch (Exception e) {
this.exception = e;
}
}