in src/it/scala/com/gu/kinesis/KinesisResourceManager.scala [67:99]
def reshardStream(regionName: String, streamName: String, newActiveShardCount: Int): Unit = {
withKinesisClient(regionName) { client =>
val currentShardCount = client
.describeStream(DescribeStreamRequest.builder().streamName(streamName).build())
.streamDescription()
.shards()
.size
val expectedShardCount = currentShardCount + newActiveShardCount
val request = UpdateShardCountRequest
.builder()
.streamName(streamName)
.targetShardCount(newActiveShardCount)
.scalingType(ScalingType.UNIFORM_SCALING)
.build()
client.updateShardCount(request)
eventually(timeout(ReshardStreamTimeout), interval(2.second)) {
val shardCount = client
.describeStream(
DescribeStreamRequest
.builder()
.streamName(streamName)
.build()
)
.streamDescription()
.shards()
.size
require(shardCount == expectedShardCount)
}
}
}