in src/main/java/com/amazonaws/services/kinesis/aggregators/metrics/CloudWatchMetricsEmitter.java [66:151]
public void emit(
Map<UpdateKey, Map<String, AggregateAttributeModification>> metricData)
throws Exception {
if (metricData != null) {
Date metricDate = null;
for (UpdateKey key : metricData.keySet()) {
PutMetricDataRequest req = new PutMetricDataRequest()
.withNamespace(this.metricsNamespace);
Collection<MetricDatum> data = new ArrayList<>();
if (key.getDateValue().equals("*")) {
LOG.debug("Not Emitting Cloudwatch Metrics for Time Horizon FOREVER");
return;
} else {
try {
metricDate = key.getDateValueAsDate();
} catch (ParseException pe) {
LOG.error(String.format(
"Unable to Parse Date Value %s",
key.getDateValue()));
return;
}
}
// send in every update as a datum
for (String summary : metricData.get(key).keySet()) {
final AggregateAttributeModification mod = metricData.get(
key).get(summary);
// TODO Handle that we've been sent an update for which a
// new final value which might not have been set. This
// means, for example, that on an hourly aggregate of FIRST,
// we'd get a single modification at the beginning of the
// hour, and then not again after
if (mod.getFinalValue() != null) {
data.add(new MetricDatum()
.withMetricName(mod.getOriginatingValueName())
.withTimestamp(metricDate)
.withDimensions(
new Dimension()
.withName("Calculation")
.withValue(
mod.getCalculationApplied()
.name()),
new Dimension()
.withName(
key.getAggregateColumnName())
.withValue(
key.getAggregatedValue()))
.withValue(mod.getFinalValue()));
}
}
boolean success = false;
int iterations = 0;
int backoffMillis = BACKOFF_MILLIS;
while (!success && iterations < MAX_WRITE_ATTEMPTS) {
iterations++;
boolean backoff = false;
try {
cloudWatchClient
.putMetricData(req.withMetricData(data));
success = true;
} catch (LimitExceededException e) {
backoff = true;
} catch (AmazonServiceException ase) {
if (ase.getErrorCode().startsWith("Throttling")) {
backoff = true;
}
}
if (backoff) {
LOG.warn("CloudWatch Limit Exceeded - backing off");
Thread.sleep(2 ^ iterations * BACKOFF_MILLIS);
}
}
if (!success) {
throw new MetricsEmitterThrottledException(
String.format(
"CloudWatch Metrics Emitter failed to write metrics after %s attempts",
MAX_WRITE_ATTEMPTS));
}
}
}
}