in src/main/java/com/amazonaws/services/dynamodbv2/streams/connectors/DynamoDBReplicationEmitter.java [387:424]
protected synchronized void emitCloudWatchMetrics(final List<Record> records, final List<Record> failures, final AtomicInteger retryCount) {
AmazonCloudWatchAsync cloudwatch = CLOUDWATCH.get();
if (null == cloudwatch) {
return;
}
if (isShutdown) {
if (records.isEmpty() && failures.isEmpty()) {
log.warn("emitCloudWatchMetrics called after shutdown. Continuing because records and failures lists are empty");
return;
} else {
throw new IllegalStateException("emitCloudWatchMetrics called after shutdown");
}
}
final List<MetricDatum> metrics = new ArrayList<MetricDatum>();
final double successful = records.size() - failures.size();
if (successful > 0) {
metrics.add(new MetricDatum().withMetricName(RECORDS_WRITTEN).withValue(successful).withUnit(StandardUnit.Count).withTimestamp(new Date()));
}
final double retries = retryCount.get();
if (retries > 0) {
metrics.add(new MetricDatum().withMetricName(RECORDS_RETRIED).withValue(retries).withUnit(StandardUnit.Count).withTimestamp(new Date()));
}
if (metrics.isEmpty()) {
return;
}
final PutMetricDataRequest request = new PutMetricDataRequest().withNamespace(applicationName).withMetricData(metrics);
cloudwatch.putMetricDataAsync(request, new AsyncHandler<PutMetricDataRequest, PutMetricDataResult>() {
@Override
public void onSuccess(PutMetricDataRequest request, PutMetricDataResult result) {
log.trace("Published metric: " + request);
}
@Override
public void onError(Exception exception) {
log.error("Could not publish metric: " + request, exception);
}
});
}