in src/main/java/com/amazonaws/services/kinesis/scaling/auto/StreamMonitor.java [90:374]
protected ScalingOperationReport processCloudwatchMetrics(
Map<KinesisOperationType, Map<StreamMetric, Map<Datapoint, Double>>> currentUtilisationMetrics,
Map<KinesisOperationType, StreamMetrics> streamMaxCapacity, int cwSampleDuration, DateTime now) {
ScalingOperationReport report = null;
ScaleDirection finalScaleDirection = null;
// for each type of operation that the customer has requested profiling
// (PUT, GET)
Map<KinesisOperationType, ScaleDirection> scaleVotes = new HashMap<>();
for (Map.Entry<KinesisOperationType, Map<StreamMetric, Map<Datapoint, Double>>> entry : currentUtilisationMetrics
.entrySet()) {
// set the default scaling vote to 'do nothing'
scaleVotes.put(entry.getKey(), ScaleDirection.NONE);
Map<StreamMetric, Triplet<Integer, Integer, Double>> perMetricSamples = new HashMap<>();
StreamMetric higherUtilisationMetric;
Double higherUtilisationPct;
// process each metric type, including Records and Bytes
for (StreamMetric metric : StreamMetric.values()) {
double streamMax = 0D;
double currentMax = 0D;
double currentPct = 0D;
double latestPct = 0d;
double latestMax = 0d;
double latestAvg = 0d;
DateTime lastTime = null;
int lowSamples = 0;
int highSamples = 0;
Map<Datapoint, Double> metrics = new HashMap<>();
if (!currentUtilisationMetrics.containsKey(entry.getKey()) || !entry.getValue().containsKey(metric)) {
// we have no samples for this type of metric which is ok -
// they'll later be counted as low metrics
} else {
metrics = entry.getValue().get(metric);
}
// if we got nothing back, then there are no operations of the
// given type happening, so this is a full 'low sample'
if (metrics.size() == 0) {
lowSamples = this.config.getScaleDown().getScaleAfterMins();
}
// process the data point aggregates retrieved from CloudWatch
// and log scale up/down votes by period
for (Map.Entry<Datapoint, Double> datapointEntry : metrics.entrySet()) {
currentMax = datapointEntry.getValue();
streamMax = streamMaxCapacity.get(entry.getKey()).get(metric);
currentPct = currentMax / streamMax;
LOG.info(String.format(
"Utilisation of %s %s %.2f%% at %s upon current value of %.2f and Stream max of %.2f",
entry.getKey().name(), metric, currentPct * 100,
formatter.format(datapointEntry.getKey().timestamp()), currentMax, streamMax));
// keep track of the last measures
if (lastTime == null
|| new DateTime(datapointEntry.getKey().timestamp().toEpochMilli()).isAfter(lastTime)) {
latestPct = currentPct;
latestMax = currentMax;
// latest average is a simple moving average
latestAvg = latestAvg == 0d ? currentPct : (latestAvg + currentPct) / 2;
}
lastTime = new DateTime(datapointEntry.getKey().timestamp().toEpochMilli());
// if the pct for the datapoint exceeds the configured threshold, then count a
// high sample, otherwise it's a low sample
if (new Double(currentPct) > new Double(this.config.getScaleUp().getScaleThresholdPct()) / 100D) {
highSamples++;
} else if (new Double(currentPct) < new Double(this.config.getScaleDown().getScaleThresholdPct())
/ 100D) {
lowSamples++;
}
}
// add low samples for the periods which we didn't get any
// data points, if there are any
if (metrics.size() < cwSampleDuration) {
lowSamples += cwSampleDuration - metrics.size();
}
LOG.info(String.format("%s %s performance analysis: %s high samples, and %s low samples",
entry.getKey().name(), metric, highSamples, lowSamples));
// merge the per-stream metric samples together for the
// operation
if (!perMetricSamples.containsKey(metric)) {
// create a new sample entry
perMetricSamples.put(metric, new Triplet<>(highSamples, lowSamples, latestAvg));
} else {
// merge the samples
Triplet<Integer, Integer, Double> previousHighLow = perMetricSamples.get(metric);
Triplet<Integer, Integer, Double> newHighLow = new Triplet<>(
previousHighLow.getValue0() + highSamples, previousHighLow.getValue1() + lowSamples,
(previousHighLow.getValue2() + latestAvg) / 2);
perMetricSamples.put(metric, newHighLow);
}
}
/*-
* we now have per metric samples for this operation type
*
* For Example:
*
* Metric | High Samples | Low Samples | Pct Used
* Bytes | 3 | 0 | .98
* Records | 0 | 10 | .2
*
* Check these values against the provided configuration. If we have
* been above the 'scaleAfterMins' with high samples for either
* metric, then we scale up. If not, then if we've been below the
* scaleAfterMins with low samples, then we scale down. Otherwise
* the vote stays as NONE
*/
// first find out which of the dimensions of stream utilisation are
// higher - we'll use the higher of the two for time checks
if (perMetricSamples.get(StreamMetric.Bytes).getValue2() >= perMetricSamples.get(StreamMetric.Records)
.getValue2()) {
higherUtilisationMetric = StreamMetric.Bytes;
higherUtilisationPct = perMetricSamples.get(StreamMetric.Bytes).getValue2();
} else {
higherUtilisationMetric = StreamMetric.Records;
higherUtilisationPct = perMetricSamples.get(StreamMetric.Records).getValue2();
}
LOG.info(String.format(
"Will decide scaling action based on metric %s[%s] due to highest utilisation metric value %.2f%%",
entry.getKey(), higherUtilisationMetric, higherUtilisationPct * 100));
if (perMetricSamples.get(higherUtilisationMetric).getValue0() >= config.getScaleUp().getScaleAfterMins()) {
scaleVotes.put(entry.getKey(), ScaleDirection.UP);
} else if (perMetricSamples.get(higherUtilisationMetric).getValue1() >= config.getScaleDown()
.getScaleAfterMins()) {
scaleVotes.put(entry.getKey(), ScaleDirection.DOWN);
}
}
// process the scaling votes
ScaleDirection getVote = scaleVotes.get(KinesisOperationType.GET);
ScaleDirection putVote = scaleVotes.get(KinesisOperationType.PUT);
LOG.info(String.format("Scaling Votes - GET: %s, PUT: %s", getVote, putVote));
// check if we have both get and put votes - if we have both then
// implement the decision matrix
if (getVote != null && putVote != null) {
// If either of the votes are to scale up, then do so.
// If both votes are DOWN, then scale down.
// Otherwise do nothing.
if (getVote == ScaleDirection.UP || putVote == ScaleDirection.UP) {
finalScaleDirection = ScaleDirection.UP;
} else if (getVote == ScaleDirection.DOWN && putVote == ScaleDirection.DOWN) {
finalScaleDirection = ScaleDirection.DOWN;
} else {
finalScaleDirection = ScaleDirection.NONE;
}
} else {
// we only have get or put votes, so use the non-null one
finalScaleDirection = (getVote == null ? putVote : getVote);
}
LOG.debug(String.format("Determined Scaling Direction %s", finalScaleDirection));
try {
int currentShardCount = this.scaler.getOpenShardCount(this.config.getStreamName());
LOG.debug(String.format("Current Shard Count: %s", currentShardCount));
// if the metric stats indicate a scale up or down, then do the action
Integer minShards = this.config.getMinShards();
Integer maxShards = this.config.getMaxShards();
if (finalScaleDirection.equals(ScaleDirection.UP)) {
Integer scaleCount = this.config.getScaleUp().getScaleCount();
Integer scalePct = this.config.getScaleUp().getScalePct();
// check the cool down interval
if (lastScaleUp != null
&& now.minusMinutes(this.config.getScaleUp().getCoolOffMins()).isBefore(lastScaleUp)) {
LOG.info(String.format(
"Stream %s: Deferring Scale Up until Cool Off Period of %s Minutes has elapsed",
this.config.getStreamName(), this.config.getScaleUp().getCoolOffMins()));
} else {
// submit a scale up task
int newTarget = StreamScalingUtils.getNewShardCount(currentShardCount, scaleCount, scalePct,
ScaleDirection.UP, minShards, maxShards);
LOG.debug(String.format("Calculated new Target Shard Count of %s", newTarget));
if (newTarget != currentShardCount && newTarget > 0) {
LOG.info(String.format(
"Requesting Scale Up of Stream %s by %s to %s as %s has been above %s%% for %s Minutes",
this.config.getStreamName(), (scaleCount != null) ? scaleCount : scalePct + "%",
newTarget, this.config.getScaleOnOperations().toString(),
this.config.getScaleUp().getScaleThresholdPct(),
this.config.getScaleUp().getScaleAfterMins()));
if (scaleCount != null) {
report = this.scaler.updateShardCount(this.config.getStreamName(), currentShardCount,
newTarget, minShards, maxShards, false);
} else {
report = this.scaler.updateShardCount(this.config.getStreamName(), currentShardCount,
newTarget, minShards, maxShards, false);
}
lastScaleUp = new DateTime(System.currentTimeMillis());
// send SNS notifications
if (report != null && this.config.getScaleUp().getNotificationARN() != null
&& this.snsClient != null) {
StreamScalingUtils.sendNotification(this.snsClient,
this.config.getScaleUp().getNotificationARN(), "Kinesis Autoscaling - Scale Up",
(report == null ? "No Changes Made" : report.asJson()));
}
} else {
LOG.info(
"Not requesting a scaling action because new shard count equals current shard count, or new shard count is 0");
}
}
} else if (finalScaleDirection.equals(ScaleDirection.DOWN)) {
Integer scaleCount = this.config.getScaleDown().getScaleCount();
Integer scalePct = this.config.getScaleDown().getScalePct();
// check the cool down interval
if (lastScaleDown != null
&& now.minusMinutes(this.config.getScaleDown().getCoolOffMins()).isBefore(lastScaleDown)) {
LOG.info(String.format(
"Stream %s: Deferring Scale Down until Cool Off Period of %s Minutes has elapsed",
this.config.getStreamName(), this.config.getScaleDown().getCoolOffMins()));
} else {
// submit a scale down
try {
int newTarget = StreamScalingUtils.getNewShardCount(currentShardCount, scaleCount, scalePct,
ScaleDirection.DOWN, minShards, maxShards);
if (newTarget != currentShardCount && newTarget > 0) {
LOG.info(String.format(
"Requesting Scale Down of Stream %s by %s to %s as %s has been below %s%% for %s Minutes",
this.config.getStreamName(), (scaleCount != null) ? scaleCount : scalePct + "%",
newTarget, config.getScaleOnOperations().toString(),
this.config.getScaleDown().getScaleThresholdPct(),
this.config.getScaleDown().getScaleAfterMins()));
report = this.scaler.updateShardCount(this.config.getStreamName(), currentShardCount,
newTarget, minShards, maxShards, false);
lastScaleDown = new DateTime(System.currentTimeMillis());
// send SNS notifications
if (report != null && this.config.getScaleDown().getNotificationARN() != null
&& this.snsClient != null) {
StreamScalingUtils.sendNotification(this.snsClient,
this.config.getScaleDown().getNotificationARN(),
"Kinesis Autoscaling - Scale Down",
(report == null ? "No Changes Made" : report.asJson()));
}
} else {
LOG.info("Scale down factor too small to result in a scaling change");
}
} catch (AlreadyOneShardException aose) {
// do nothing - we're already at 1 shard
LOG.info(String.format("Stream %s: Not Scaling Down - Already at Minimum of 1 Shard",
this.config.getStreamName()));
}
}
} else {
// scale direction not set, so we're not going to scale
// up or down - everything fine
LOG.info("No Scaling required - Stream capacity within specified tolerances");
return this.scaler.reportFor(ScalingCompletionStatus.NoActionRequired, this.config.getStreamName(), 0,
finalScaleDirection);
}
} catch (Exception e) {
LOG.error("Failed to process stream " + this.config.getStreamName(), e);
}
return report;
}