in pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java [54:82]
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
log.info("Checkpointing shard " + kinesisShardId);
for (int i = 0; i < numRetries; i++) {
try {
checkpointer.checkpoint();
break;
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown.
log.info("Caught shutdown exception, skipping checkpoint.", se);
break;
} catch (InvalidStateException e) {
log.error("Cannot save checkpoint to the DynamoDB table.", e);
break;
} catch (ThrottlingException | KinesisClientLibDependencyException e) {
// Back off and re-attempt checkpoint upon transient failures
if (i >= (numRetries - 1)) {
log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
break;
}
}
try {
Thread.sleep(backoffTime);
} catch (InterruptedException e) {
log.debug("Interrupted sleep", e);
}
}
}