protected def withConsumerSource[A]()

in src/it/scala/com/gu/kinesis/KinesisTestComponents.scala [53:73]


  protected def withConsumerSource[A](
      workerId: String
  )(closure: (Source[KinesisRecord, NotUsed], InspectableConsumerStats) => A)(implicit config: TestStreamConfig): A = {
    val consumerStats = new InspectableConsumerStats
    val (consumerSource, materializationFuture) = liftMaterializedValue {
      KinesisSource(
        KinesisSource.createKclWorker,
        config.kclConfig(workerId),
        config.shardCheckpointConfig,
        consumerStats
      ).viaMat(KillSwitches.single)(Keep.both)
    }
    val closureResult = Try(closure(consumerSource, consumerStats))
    Try { // Always keep the original test exception, and try to shutdown cleanly if possible.
      val (workerTerminationFuture, killSwitch) =
        Await.result(materializationFuture, 0.second)
      killSwitch.shutdown()
      Await.ready(workerTerminationFuture, KinesisResourceManager.WorkerTerminationTimeout)
    }
    closureResult.get
  }