in src/it/scala/com/gu/kinesis/KinesisTestComponents.scala [32:47]
protected def messageSource(
keyCount: Int,
messageIntervalPerKey: FiniteDuration,
keyPrefix: String = "key"
): Source[KeyAndMessage, NotUsed] = {
require(keyCount >= 2)
def mkKey(i: Int) = f"${keyPrefix}_$i%03d"
def sourceForKey(key: String): Source[KeyAndMessage, NotUsed] = {
def mkMessage(i: Int) = key -> f"msg_$i%03d"
Source
.fromIterator(() => Iterator.from(1).map(mkMessage))
.throttle(elements = 1, per = messageIntervalPerKey, maximumBurst = 1, mode = ThrottleMode.shaping)
}
val sources = (1 to keyCount).map(mkKey).map(sourceForKey)
Source.combine(sources(0), sources(1), sources.drop(2): _*)(strategy = Merge(_))
}