in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [185:209]
private def blockToEnqueueAndHandleResult(kinesisRecords: IndexedSeq[KinesisRecord]): Unit = {
try {
kinesisRecords.foreach(consumerStats.trackRecord(shardConsumerId, _))
val enqueueFuture = consumerStats.trackBatchEnqueue(shardConsumerId, kinesisRecords.size) {
queue.offer(kinesisRecords)
}
Await.result(enqueueFuture, Duration.Inf) match {
case QueueOfferResult.Enqueued =>
// Do nothing.
case QueueOfferResult.QueueClosed =>
// Do nothing.
case QueueOfferResult.Dropped =>
streamKillSwitch.abort(
new AssertionError("RecordProcessor source queue must use `OverflowStrategy.Backpressure`.")
)
case QueueOfferResult.Failure(e) =>
streamKillSwitch.abort(e)
}
} catch {
case NonFatal(e) => streamKillSwitch.abort(e)
}
}