in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [211:241]
private def checkpointAndHandleErrors(checkpointer: RecordProcessorCheckpointer, shardEnd: Boolean = false): Unit = {
try {
if (shardEnd && shardCheckpointTracker.allInFlightRecordsProcessed) {
// Checkpointing the actual offset is not enough. Instead, we are required to use the checkpoint()
// method without arguments, which is not covered by Kinesis documentation.
checkpointer.checkpoint()
consumerStats.checkpointShardEndAcked(shardConsumerId)
log.info(s"Successfully checkpointed $shardConsumerId at SHARD_END.")
} else {
shardCheckpointTracker.checkpointLastProcessedRecord { kinesisRecord =>
val seqNumber = kinesisRecord.sequenceNumber
val subSeqNumber = kinesisRecord.subSequenceNumber.getOrElse(0L)
checkpointer.checkpoint(seqNumber, subSeqNumber)
consumerStats.checkpointAcked(shardConsumerId)
log.info(s"Successfully checkpointed $shardConsumerId at ${kinesisRecord.offsetString}.")
}
}
} catch {
case e: ShutdownException =>
// Do nothing.
case e @ (_: ThrottlingException | _: KinesisClientLibDependencyException) =>
consumerStats.checkpointDelayed(shardConsumerId, e)
log.error(s"Failed to checkpoint $shardConsumerId, will retry later.", e)
case NonFatal(e) =>
consumerStats.checkpointFailed(shardConsumerId, e)
log.error(s"Failed to checkpoint $shardConsumerId, failing the streaming...", e)
streamKillSwitch.abort(e)
}
}