private def checkpointAndHandleErrors()

in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [211:241]


  private def checkpointAndHandleErrors(checkpointer: RecordProcessorCheckpointer, shardEnd: Boolean = false): Unit = {
    try {
      if (shardEnd && shardCheckpointTracker.allInFlightRecordsProcessed) {
        // Checkpointing the actual offset is not enough. Instead, we are required to use the checkpoint()
        // method without arguments, which is not covered by Kinesis documentation.
        checkpointer.checkpoint()
        consumerStats.checkpointShardEndAcked(shardConsumerId)
        log.info(s"Successfully checkpointed $shardConsumerId at SHARD_END.")
      } else {
        shardCheckpointTracker.checkpointLastProcessedRecord { kinesisRecord =>
          val seqNumber = kinesisRecord.sequenceNumber
          val subSeqNumber = kinesisRecord.subSequenceNumber.getOrElse(0L)
          checkpointer.checkpoint(seqNumber, subSeqNumber)
          consumerStats.checkpointAcked(shardConsumerId)
          log.info(s"Successfully checkpointed $shardConsumerId at ${kinesisRecord.offsetString}.")
        }
      }
    } catch {
      case e: ShutdownException =>
      // Do nothing.

      case e @ (_: ThrottlingException | _: KinesisClientLibDependencyException) =>
        consumerStats.checkpointDelayed(shardConsumerId, e)
        log.error(s"Failed to checkpoint $shardConsumerId, will retry later.", e)

      case NonFatal(e) =>
        consumerStats.checkpointFailed(shardConsumerId, e)
        log.error(s"Failed to checkpoint $shardConsumerId, failing the streaming...", e)
        streamKillSwitch.abort(e)
    }
  }