protected def messageSource()

in src/it/scala/com/gu/kinesis/KinesisTestComponents.scala [32:47]


  protected def messageSource(
      keyCount: Int,
      messageIntervalPerKey: FiniteDuration,
      keyPrefix: String = "key"
  ): Source[KeyAndMessage, NotUsed] = {
    require(keyCount >= 2)
    def mkKey(i: Int) = f"${keyPrefix}_$i%03d"
    def sourceForKey(key: String): Source[KeyAndMessage, NotUsed] = {
      def mkMessage(i: Int) = key -> f"msg_$i%03d"
      Source
        .fromIterator(() => Iterator.from(1).map(mkMessage))
        .throttle(elements = 1, per = messageIntervalPerKey, maximumBurst = 1, mode = ThrottleMode.shaping)
    }
    val sources = (1 to keyCount).map(mkKey).map(sourceForKey)
    Source.combine(sources(0), sources(1), sources.drop(2): _*)(strategy = Merge(_))
  }