in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [259:267]
private def waitForInFlightRecordsUnlessStreamFailed(waitDuration: Duration): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
val hasStreamFailed = {
if (streamTerminationFuture.isCompleted) Try(Await.result(streamTerminationFuture, 0.seconds)).isFailure
else false
}
if (!hasStreamFailed) Try(Await.result(shardCheckpointTracker.allInFlightRecordsProcessedFuture, waitDuration))
}