in src/it/scala/com/gu/kinesis/KinesisTestComponents.scala [53:73]
protected def withConsumerSource[A](
workerId: String
)(closure: (Source[KinesisRecord, NotUsed], InspectableConsumerStats) => A)(implicit config: TestStreamConfig): A = {
val consumerStats = new InspectableConsumerStats
val (consumerSource, materializationFuture) = liftMaterializedValue {
KinesisSource(
KinesisSource.createKclWorker,
config.kclConfig(workerId),
config.shardCheckpointConfig,
consumerStats
).viaMat(KillSwitches.single)(Keep.both)
}
val closureResult = Try(closure(consumerSource, consumerStats))
Try { // Always keep the original test exception, and try to shutdown cleanly if possible.
val (workerTerminationFuture, killSwitch) =
Await.result(materializationFuture, 0.second)
killSwitch.shutdown()
Await.ready(workerTerminationFuture, KinesisResourceManager.WorkerTerminationTimeout)
}
closureResult.get
}