override def processRecords()

in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [113:126]


  override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
    try {
      val records = processRecordsInput.records().asScala.toIndexedSeq
      val kinesisRecords = records.map(KinesisRecord.fromMutableRecord)
      shardCheckpointTracker.watchForCompletion(kinesisRecords)
      recordCheckpointerStats()
      if (shardCheckpointTracker.shouldCheckpoint) checkpointAndHandleErrors(processRecordsInput.checkpointer())
      if (kinesisRecords.nonEmpty) blockToEnqueueAndHandleResult(kinesisRecords)
    } catch {
      case NonFatal(e) =>
        log.error("Unhandled exception in `processRecords()`, failing the streaming...", e)
        streamKillSwitch.abort(e)
    }
  }