in src/main/scala/com/gu/contentapi/firehose/kinesis/SingleEventProcessor.scala [62:76]
protected def processEvents(events: Seq[EventT]): Unit
/* Checkpoint after every X seconds or every Y records */
private def shouldCheckpointNow =
recordsProcessedSinceCheckpoint.get() >= maxCheckpointBatchSize ||
lastCheckpointedAt.get() < System.nanoTime() - checkpointInterval.toNanos
private def checkpoint(checkpointer: RecordProcessorCheckpointer) = {
/* Store our latest position in the stream */
checkpointer.checkpoint()
/* Reset the counters */
lastCheckpointedAt.set(System.nanoTime())
recordsProcessedSinceCheckpoint.set(0)
}