in src/main/java/com/amazonaws/kinesisvideo/parser/kinesis/KinesisRecordProcessor.java [230:260]
private void checkpoint(final IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
for (int i = 0; i < NUM_RETRIES; i++) {
try {
checkpointer.checkpoint();
break;
} catch (final ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
break;
} catch (final ThrottlingException e) {
// Backoff and re-attempt checkpoint upon transient failures
if (i >= (NUM_RETRIES - 1)) {
LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
break;
} else {
LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
+ NUM_RETRIES, e);
}
} catch (final InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
break;
}
try {
Thread.sleep(BACKOFF_TIME_IN_MILLIS);
} catch (final InterruptedException e) {
LOG.debug("Interrupted sleep", e);
}
}
}