in src/it/scala/com/gu/kinesis/KinesisSourceTest.scala [273:297]
private def dumpStream(config: TestStreamConfig): Unit = {
implicit val dumpConfig: TestStreamConfig = config.copy(applicationName = s"${config.applicationName}_streamDump")
withConsumerSource("dumpConsumer") { (kinesisSource, _) =>
val inspectReceived = kinesisSource.via(extractKeyAndMessage).runWith(Inspectable.sink)
val result = Try {
var received = IndexedSeq.empty[KeyAndMessage]
eventually {
val newReceived = inspectReceived()
if (newReceived.isEmpty || newReceived != received) {
received = newReceived
throw new RuntimeException(s"Still dumping the stream.")
}
}
received
}
result match {
case Success(messages) =>
log.info(s"Stream ${dumpConfig.streamName} dump: \n${messages.mkString(",")}\n")
case Failure(e) =>
log.error(s"Could not dump the stream ${dumpConfig.streamName}.", e)
}
}
}