in src/it/scala/com/gu/kinesis/Inspectable.scala [74:87]
def waitForNrOfCheckpointsPerShard(minNumberOfShards: Int, checkpointCount: Int)(implicit
patienceConfig: PatienceConfig
): Unit = {
checkpointEventsByShardConsumer.clear()
eventually {
val currentCheckpointCounts = checkpointCountByShardConsumer()
// The first checkpoint may already be in-progress when we inspect acked checkpoints.
val minCheckpointCountDelta = checkpointCount + 1
val shardConsumersWithEnoughCheckpoints = currentCheckpointCounts.collect {
case (shardConsumer, count) if count >= minCheckpointCountDelta => shardConsumer
}
require(shardConsumersWithEnoughCheckpoints.size >= minNumberOfShards)
}
}