public void emit()

in src/main/java/com/amazonaws/services/kinesis/aggregators/metrics/CloudWatchMetricsEmitter.java [66:151]


	public void emit(
			Map<UpdateKey, Map<String, AggregateAttributeModification>> metricData)
			throws Exception {
		if (metricData != null) {
			Date metricDate = null;

			for (UpdateKey key : metricData.keySet()) {
				PutMetricDataRequest req = new PutMetricDataRequest()
						.withNamespace(this.metricsNamespace);
				Collection<MetricDatum> data = new ArrayList<>();

				if (key.getDateValue().equals("*")) {
					LOG.debug("Not Emitting Cloudwatch Metrics for Time Horizon FOREVER");
					return;
				} else {
					try {
						metricDate = key.getDateValueAsDate();
					} catch (ParseException pe) {
						LOG.error(String.format(
								"Unable to Parse Date Value %s",
								key.getDateValue()));
						return;
					}
				}

				// send in every update as a datum
				for (String summary : metricData.get(key).keySet()) {
					final AggregateAttributeModification mod = metricData.get(
							key).get(summary);
					// TODO Handle that we've been sent an update for which a
					// new final value which might not have been set. This
					// means, for example, that on an hourly aggregate of FIRST,
					// we'd get a single modification at the beginning of the
					// hour, and then not again after
					if (mod.getFinalValue() != null) {
						data.add(new MetricDatum()
								.withMetricName(mod.getOriginatingValueName())
								.withTimestamp(metricDate)
								.withDimensions(
										new Dimension()
												.withName("Calculation")
												.withValue(
														mod.getCalculationApplied()
																.name()),
										new Dimension()
												.withName(
														key.getAggregateColumnName())
												.withValue(
														key.getAggregatedValue()))
								.withValue(mod.getFinalValue()));
					}
				}

				boolean success = false;
				int iterations = 0;
				int backoffMillis = BACKOFF_MILLIS;
				while (!success && iterations < MAX_WRITE_ATTEMPTS) {
					iterations++;
					boolean backoff = false;
					try {
						cloudWatchClient
								.putMetricData(req.withMetricData(data));
						success = true;
					} catch (LimitExceededException e) {
						backoff = true;
					} catch (AmazonServiceException ase) {
						if (ase.getErrorCode().startsWith("Throttling")) {
							backoff = true;
						}
					}

					if (backoff) {
						LOG.warn("CloudWatch Limit Exceeded - backing off");
						Thread.sleep(2 ^ iterations * BACKOFF_MILLIS);
					}
				}

				if (!success) {
					throw new MetricsEmitterThrottledException(
							String.format(
									"CloudWatch Metrics Emitter failed to write metrics after %s attempts",
									MAX_WRITE_ATTEMPTS));
				}
			}
		}
	}