private def waitForInFlightRecordsUnlessStreamFailed()

in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [259:267]


  private def waitForInFlightRecordsUnlessStreamFailed(waitDuration: Duration): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global

    val hasStreamFailed = {
      if (streamTerminationFuture.isCompleted) Try(Await.result(streamTerminationFuture, 0.seconds)).isFailure
      else false
    }
    if (!hasStreamFailed) Try(Await.result(shardCheckpointTracker.allInFlightRecordsProcessedFuture, waitDuration))
  }