in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [113:126]
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
try {
val records = processRecordsInput.records().asScala.toIndexedSeq
val kinesisRecords = records.map(KinesisRecord.fromMutableRecord)
shardCheckpointTracker.watchForCompletion(kinesisRecords)
recordCheckpointerStats()
if (shardCheckpointTracker.shouldCheckpoint) checkpointAndHandleErrors(processRecordsInput.checkpointer())
if (kinesisRecords.nonEmpty) blockToEnqueueAndHandleResult(kinesisRecords)
} catch {
case NonFatal(e) =>
log.error("Unhandled exception in `processRecords()`, failing the streaming...", e)
streamKillSwitch.abort(e)
}
}