public void run()

in src/main/java/com/amazonaws/services/kinesis/scaling/auto/StreamMonitor.java [377:458]


	public void run() {
		LOG.info(String.format("Started Stream Monitor for %s", config.getStreamName()));
		DateTime lastShardCapacityRefreshTime = new DateTime(System.currentTimeMillis());

		// create a StreamMetricManager object
		StreamMetricManager metricManager = new StreamMetricManager(this.config.getStreamName(),
				this.config.getScaleOnOperations(), this.cloudWatchClient, this.kinesisClient);

		LOG.info(String.format("Using Stream Scaler Version %s", StreamScaler.version));

		try {
			// load the current configured max capacity
			metricManager.loadMaxCapacity();

			// configure the duration to request from cloudwatch
			int cwSampleDuration = Math.max(config.getScaleUp().getScaleAfterMins(),
					config.getScaleDown().getScaleAfterMins());

			ScalingOperationReport report = null;

			do {
				DateTime now = new DateTime(System.currentTimeMillis());

				// fetch only the last N minutes metrics
				DateTime metricStartTime = now.minusMinutes(cwSampleDuration);

				// load the current cloudwatch metrics for the stream via the
				// metrics manager
				@SuppressWarnings("rawtypes")
				Map currentUtilisationMetrics = metricManager.queryCurrentUtilisationMetrics(cwSampleDuration,
						metricStartTime, now);

				// process the aggregated set of Cloudwatch Datapoints
				try {
					report = processCloudwatchMetrics(currentUtilisationMetrics, metricManager.getStreamMaxCapacity(),
							cwSampleDuration, now);

					if (report != null) {
						// refresh the current max capacity after the
						// modification
						metricManager.loadMaxCapacity();
						lastShardCapacityRefreshTime = now;

						// notify all report listeners that we've completed a
						// scaling operation
						if (this.config.getScalingOperationReportListener() != null) {
							this.config.getScalingOperationReportListener().onReport(report);
						}

						if (report.getScaleDirection() != ScaleDirection.NONE) {
							LOG.info(report.toString());
						}
						report = null;
					}
				} catch (Exception e) {
					// keep running even though the scaling action had an exception
					LOG.error(e.getMessage());
				}

				// refresh shard stats every configured period, in case someone
				// has manually updated the number of shards manually
				if (now.minusMinutes(this.config.getRefreshShardsNumberAfterMin())
						.isAfter(lastShardCapacityRefreshTime)) {
					metricManager.loadMaxCapacity();
					lastShardCapacityRefreshTime = now;
				}

				try {
					LOG.info(String.format("Next Check Cycle in %s seconds", this.config.getCheckInterval()));
					Thread.sleep(this.config.getCheckInterval() * 1000);
				} catch (InterruptedException e) {
					LOG.error(e.getMessage(), e);
					break;
				}
			} while (keepRunning);

			LOG.info(String.format("Stream Monitor for %s in %s Completed. Exiting.", this.config.getStreamName(),
					this.config.getRegion()));
		} catch (Exception e) {
			this.exception = e;
		}
	}