in src/main/java/com/amazonaws/services/kinesis/scaling/StreamScaler.java [441:493]
public ScalingOperationReport updateShardCount(String streamName, int currentShardCount, int targetShardCount,
Integer minShards, Integer maxShards, boolean waitForCompletion) throws Exception {
ScaleDirection scaleDirection = getScaleDirection(currentShardCount, targetShardCount);
if (scaleDirection == ScaleDirection.NONE) {
LOG.info(String.format("No Scaling Action being taken as current and target Shard count = %s",
currentShardCount));
return reportFor(ScalingCompletionStatus.NoActionRequired, streamName, 0, ScaleDirection.NONE);
}
// catch already at minimum
if (targetShardCount < 1) {
throw new AlreadyOneShardException();
}
// ensure we stay between min/max
if (minShards != null && targetShardCount < minShards) {
targetShardCount = minShards;
} else if (maxShards != null && targetShardCount > maxShards) {
targetShardCount = maxShards;
}
try {
LOG.info(String.format("Updating Stream %s Shard Count to %s", streamName, targetShardCount));
UpdateShardCountRequest req = UpdateShardCountRequest.builder()
.scalingType(ScalingType.UNIFORM_SCALING).streamName(streamName)
.targetShardCount(targetShardCount).build();
this.kinesisClient.updateShardCount(req);
// block until the stream transitions back to active state
LOG.info("Waiting for Stream to transition back to Active Status");
StreamScalingUtils.waitForStreamStatus(this.kinesisClient, streamName, "ACTIVE");
// return the current state of the stream
if (waitForCompletion) {
return reportFor(ScalingCompletionStatus.Ok, streamName, 1, scaleDirection);
} else {
return null;
}
} catch (InvalidArgumentException | LimitExceededException ipe) {
// this will be raised if the scaling operation we are
// trying to make is not within the limits of the
// UpdateShardCount API
// http://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html
//
// so now we'll default back to the split/merge way
// return the current state of the stream
LOG.info("UpdateShardCount API Limit Exceeded. Falling back to manual scaling");
return scaleStream(streamName, currentShardCount, targetShardCount, 0, 0,
System.currentTimeMillis(), minShards, maxShards);
}
}