pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/StreamsRecordProcessor.java [55:85]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        log.info("Checkpointing shard " + kinesisShardId);
        
        for (int i = 0; i < numRetries; i++) {
            try {
                checkpointer.checkpoint();
                break;
            } catch (ShutdownException se) {
                // Ignore checkpoint if the processor instance has been shutdown.
                log.info("Caught shutdown exception, skipping checkpoint.", se);
                break;
            } catch (InvalidStateException e) {
                log.error("Cannot save checkpoint to the DynamoDB table.", e);
                break;
            } catch (ThrottlingException | KinesisClientLibDependencyException e) {
                // Back off and re-attempt checkpoint upon transient failures
                if (i >= (numRetries - 1)) {
                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                    break;
                }
            }

            try {
                Thread.sleep(backoffTime);
            } catch (InterruptedException e) {
                log.debug("Interrupted sleep", e);
            }
        }
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java [53:83]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        log.info("Checkpointing shard " + kinesisShardId);
        
        for (int i = 0; i < numRetries; i++) {
            try {
                checkpointer.checkpoint();
                break;
            } catch (ShutdownException se) {
                // Ignore checkpoint if the processor instance has been shutdown.
                log.info("Caught shutdown exception, skipping checkpoint.", se);
                break;
            } catch (InvalidStateException e) {
                log.error("Cannot save checkpoint to the DynamoDB table.", e);
                break;
            } catch (ThrottlingException | KinesisClientLibDependencyException e) {
                // Back off and re-attempt checkpoint upon transient failures
                if (i >= (numRetries - 1)) {
                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                    break;
                }
            }

            try {
                Thread.sleep(backoffTime);
            } catch (InterruptedException e) {
                log.debug("Interrupted sleep", e);
            }
        }
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



