private def dumpStream()

in src/it/scala/com/gu/kinesis/KinesisSourceTest.scala [273:297]


  private def dumpStream(config: TestStreamConfig): Unit = {
    implicit val dumpConfig: TestStreamConfig = config.copy(applicationName = s"${config.applicationName}_streamDump")
    withConsumerSource("dumpConsumer") { (kinesisSource, _) =>
      val inspectReceived = kinesisSource.via(extractKeyAndMessage).runWith(Inspectable.sink)

      val result = Try {
        var received = IndexedSeq.empty[KeyAndMessage]
        eventually {
          val newReceived = inspectReceived()
          if (newReceived.isEmpty || newReceived != received) {
            received = newReceived
            throw new RuntimeException(s"Still dumping the stream.")
          }
        }
        received
      }
      result match {
        case Success(messages) =>
          log.info(s"Stream ${dumpConfig.streamName} dump: \n${messages.mkString(",")}\n")

        case Failure(e) =>
          log.error(s"Could not dump the stream ${dumpConfig.streamName}.", e)
      }
    }
  }