in src/main/java/com/amazonaws/services/kinesis/scaling/StreamScaler.java [289:430]
private ScalingOperationReport scaleStream(String streamName, int originalShardCount, int targetShards,
int operationsMade, int shardsCompleted, long startTime, Stack<ShardHashInfo> shardStack, Integer minCount,
Integer maxCount) throws Exception {
final double targetPct = 1d / targetShards;
boolean checkMinMax = minCount != null || maxCount != null;
String lastShardLower = null;
String lastShardHigher = null;
ScaleDirection scaleDirection = originalShardCount >= targetShards ? ScaleDirection.DOWN : ScaleDirection.UP;
// seed the current shard count from the working stack
int currentCount = shardStack.size();
// we'll run iteratively until the shard stack is emptied or we reach
// one of the caps
ScalingCompletionStatus endStatus = ScalingCompletionStatus.Ok;
do {
if (checkMinMax) {
// stop scaling if we've reached the min or max count
boolean stopOnCap = false;
String message = null;
if (minCount != null && currentCount == minCount && targetShards <= minCount) {
stopOnCap = true;
if (operationsMade == 0) {
endStatus = ScalingCompletionStatus.AlreadyAtMinimum;
} else {
endStatus = ScalingCompletionStatus.Ok;
}
message = String.format("%s: Minimum Shard Count of %s Reached", streamName, minCount);
}
if (maxCount != null && currentCount == maxCount && targetShards >= maxCount) {
if (operationsMade == 0) {
endStatus = ScalingCompletionStatus.AlreadyAtMaximum;
} else {
endStatus = ScalingCompletionStatus.Ok;
}
message = String.format("%s: Maximum Shard Count of %s Reached", streamName, maxCount);
stopOnCap = true;
}
if (stopOnCap) {
LOG.info(message);
return reportFor(endStatus, streamName, operationsMade, scaleDirection);
}
}
// report progress every shard completed
if (shardsCompleted > 0) {
reportProgress(streamName, shardsCompleted, currentCount, shardStack.size(), startTime);
}
// once the stack is emptied, return a report of the hash space
// allocation
if (shardStack.empty()) {
return reportFor(endStatus, streamName, operationsMade, scaleDirection);
}
ShardHashInfo lowerShard = shardStack.pop();
if (lowerShard != null) {
lastShardLower = lowerShard.getShardId();
} else {
throw new Exception(String.format("%s: Null ShardHashInfo retrieved after processing %s", streamName,
lastShardLower));
}
// first check is if the bottom shard is smaller or larger than our
// target width
if (StreamScalingUtils.softCompare(lowerShard.getPctWidth(), targetPct) < 0) {
if (shardStack.empty()) {
// our current shard is smaller than the target size, but
// there's nothing else to do
return reportFor(endStatus, streamName, operationsMade, scaleDirection);
} else {
// get the next higher shard
ShardHashInfo higherShard = shardStack.pop();
if (higherShard != null) {
lastShardHigher = higherShard.getShardId();
}
if (StreamScalingUtils.softCompare(lowerShard.getPctWidth() + higherShard.getPctWidth(),
targetPct) > 0) {
// The two lowest shards together are larger than the
// target size, so split the upper at the target offset
// and
// merge the lower of the two new shards to the lowest
// shard
AdjacentShards splitUpper = higherShard.doSplit(kinesisClient,
targetPct - lowerShard.getPctWidth(), shardStack.isEmpty() ? higherShard.getShardId()
: shardStack.lastElement().getShardId());
operationsMade++;
// place the upper of the two new shards onto the stack
shardStack.push(splitUpper.getHigherShard());
// merge lower of the new shards with the lowest shard
LOG.info(String.format("Merging Shard %s with %s", lowerShard.getShardId(),
splitUpper.getLowerShard().getShardId()));
ShardHashInfo lowerMerged = new AdjacentShards(streamName, lowerShard,
splitUpper.getLowerShard()).doMerge(kinesisClient,
shardStack.isEmpty() ? splitUpper.getHigherShard().getShardId()
: shardStack.lastElement().getShardId());
LOG.info(String.format("Created Shard %s (%s)", lowerMerged.getShardId(),
pctFormat.format(lowerMerged.getPctWidth())));
shardsCompleted++;
// count of shards is unchanged in this case as we've
// just rebalanced, so current count is not updated
} else {
// The lower and upper shards together are smaller than
// the target size, so merge the two shards together
ShardHashInfo lowerMerged = new AdjacentShards(streamName, lowerShard, higherShard)
.doMerge(kinesisClient, shardStack.isEmpty() ? higherShard.getShardId()
: shardStack.lastElement().getShardId());
shardsCompleted++;
currentCount--;
// put the new shard back on the stack - it may still be
// too small relative to the target
shardStack.push(lowerMerged);
}
}
} else if (StreamScalingUtils.softCompare(lowerShard.getPctWidth(), targetPct) == 0) {
// at the correct size - move on
} else {
// lowest shard is larger than the target size so split at the
// target offset
AdjacentShards splitLower = lowerShard.doSplit(kinesisClient, targetPct,
shardStack.isEmpty() ? lowerShard.getShardId() : shardStack.lastElement().getShardId());
operationsMade++;
LOG.info(String.format("Split Shard %s at %s Creating Final Shard %s and Intermediate Shard %s (%s)",
lowerShard.getShardId(), pctFormat.format(targetPct), splitLower.getLowerShard().getShardId(),
splitLower.getHigherShard(), pctFormat.format(splitLower.getHigherShard().getPctWidth())));
// push the higher of the two splits back onto the stack
shardStack.push(splitLower.getHigherShard());
shardsCompleted++;
currentCount++;
}
} while (shardStack.size() > 0 || !shardStack.empty());
return reportFor(endStatus, streamName, operationsMade, scaleDirection);
}