private ScalingOperationReport scaleStream()

in src/main/java/com/amazonaws/services/kinesis/scaling/StreamScaler.java [289:430]


	private ScalingOperationReport scaleStream(String streamName, int originalShardCount, int targetShards,
			int operationsMade, int shardsCompleted, long startTime, Stack<ShardHashInfo> shardStack, Integer minCount,
			Integer maxCount) throws Exception {
		final double targetPct = 1d / targetShards;
		boolean checkMinMax = minCount != null || maxCount != null;
		String lastShardLower = null;
		String lastShardHigher = null;
		ScaleDirection scaleDirection = originalShardCount >= targetShards ? ScaleDirection.DOWN : ScaleDirection.UP;

		// seed the current shard count from the working stack
		int currentCount = shardStack.size();

		// we'll run iteratively until the shard stack is emptied or we reach
		// one of the caps
		ScalingCompletionStatus endStatus = ScalingCompletionStatus.Ok;
		do {
			if (checkMinMax) {
				// stop scaling if we've reached the min or max count
				boolean stopOnCap = false;
				String message = null;
				if (minCount != null && currentCount == minCount && targetShards <= minCount) {
					stopOnCap = true;
					if (operationsMade == 0) {
						endStatus = ScalingCompletionStatus.AlreadyAtMinimum;
					} else {
						endStatus = ScalingCompletionStatus.Ok;
					}
					message = String.format("%s: Minimum Shard Count of %s Reached", streamName, minCount);
				}
				if (maxCount != null && currentCount == maxCount && targetShards >= maxCount) {
					if (operationsMade == 0) {
						endStatus = ScalingCompletionStatus.AlreadyAtMaximum;
					} else {
						endStatus = ScalingCompletionStatus.Ok;
					}
					message = String.format("%s: Maximum Shard Count of %s Reached", streamName, maxCount);
					stopOnCap = true;
				}
				if (stopOnCap) {
					LOG.info(message);
					return reportFor(endStatus, streamName, operationsMade, scaleDirection);
				}
			}

			// report progress every shard completed
			if (shardsCompleted > 0) {
				reportProgress(streamName, shardsCompleted, currentCount, shardStack.size(), startTime);
			}

			// once the stack is emptied, return a report of the hash space
			// allocation
			if (shardStack.empty()) {
				return reportFor(endStatus, streamName, operationsMade, scaleDirection);
			}

			ShardHashInfo lowerShard = shardStack.pop();
			if (lowerShard != null) {
				lastShardLower = lowerShard.getShardId();
			} else {
				throw new Exception(String.format("%s: Null ShardHashInfo retrieved after processing %s", streamName,
						lastShardLower));
			}

			// first check is if the bottom shard is smaller or larger than our
			// target width
			if (StreamScalingUtils.softCompare(lowerShard.getPctWidth(), targetPct) < 0) {
				if (shardStack.empty()) {
					// our current shard is smaller than the target size, but
					// there's nothing else to do
					return reportFor(endStatus, streamName, operationsMade, scaleDirection);
				} else {
					// get the next higher shard
					ShardHashInfo higherShard = shardStack.pop();

					if (higherShard != null) {
						lastShardHigher = higherShard.getShardId();
					}

					if (StreamScalingUtils.softCompare(lowerShard.getPctWidth() + higherShard.getPctWidth(),
							targetPct) > 0) {
						// The two lowest shards together are larger than the
						// target size, so split the upper at the target offset
						// and
						// merge the lower of the two new shards to the lowest
						// shard
						AdjacentShards splitUpper = higherShard.doSplit(kinesisClient,
								targetPct - lowerShard.getPctWidth(), shardStack.isEmpty() ? higherShard.getShardId()
										: shardStack.lastElement().getShardId());
						operationsMade++;

						// place the upper of the two new shards onto the stack
						shardStack.push(splitUpper.getHigherShard());

						// merge lower of the new shards with the lowest shard
						LOG.info(String.format("Merging Shard %s with %s", lowerShard.getShardId(),
								splitUpper.getLowerShard().getShardId()));
						ShardHashInfo lowerMerged = new AdjacentShards(streamName, lowerShard,
								splitUpper.getLowerShard()).doMerge(kinesisClient,
										shardStack.isEmpty() ? splitUpper.getHigherShard().getShardId()
												: shardStack.lastElement().getShardId());
						LOG.info(String.format("Created Shard %s (%s)", lowerMerged.getShardId(),
								pctFormat.format(lowerMerged.getPctWidth())));
						shardsCompleted++;

						// count of shards is unchanged in this case as we've
						// just rebalanced, so current count is not updated
					} else {
						// The lower and upper shards together are smaller than
						// the target size, so merge the two shards together
						ShardHashInfo lowerMerged = new AdjacentShards(streamName, lowerShard, higherShard)
								.doMerge(kinesisClient, shardStack.isEmpty() ? higherShard.getShardId()
										: shardStack.lastElement().getShardId());
						shardsCompleted++;
						currentCount--;

						// put the new shard back on the stack - it may still be
						// too small relative to the target
						shardStack.push(lowerMerged);
					}
				}
			} else if (StreamScalingUtils.softCompare(lowerShard.getPctWidth(), targetPct) == 0) {
				// at the correct size - move on
			} else {
				// lowest shard is larger than the target size so split at the
				// target offset
				AdjacentShards splitLower = lowerShard.doSplit(kinesisClient, targetPct,
						shardStack.isEmpty() ? lowerShard.getShardId() : shardStack.lastElement().getShardId());
				operationsMade++;

				LOG.info(String.format("Split Shard %s at %s Creating Final Shard %s and Intermediate Shard %s (%s)",
						lowerShard.getShardId(), pctFormat.format(targetPct), splitLower.getLowerShard().getShardId(),
						splitLower.getHigherShard(), pctFormat.format(splitLower.getHigherShard().getPctWidth())));

				// push the higher of the two splits back onto the stack
				shardStack.push(splitLower.getHigherShard());
				shardsCompleted++;
				currentCount++;
			}
		} while (shardStack.size() > 0 || !shardStack.empty());

		return reportFor(endStatus, streamName, operationsMade, scaleDirection);
	}