in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutRecordPublisher.java [135:180]
private RecordPublisherRunResult runWithBackoff(
final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber(
consumerArn,
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout());
boolean complete;
try {
complete = fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
attempt = 0;
} catch (FanOutSubscriberInterruptedException ex) {
LOG.info("Thread interrupted, closing record publisher for shard {}.", subscribedShard.getShard().getShardId(), ex);
return CANCELLED;
} catch (RecoverableFanOutSubscriberException ex) {
// Recoverable errors should be reattempted without contributing to the retry policy
// A recoverable error would not result in the Flink job being cancelled
backoff(ex);
return INCOMPLETE;
} catch (FanOutSubscriberException ex) {
// We have received an error from the network layer
// This can be due to limits being exceeded, network timeouts, etc
// We should backoff, reacquire a subscription and try again
if (ex.getCause() instanceof ResourceNotFoundException) {
LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered." +
"Marking this shard as complete {} ({})", subscribedShard.getShard().getShardId(), consumerArn);
return COMPLETE;
}
if (attempt == configuration.getSubscribeToShardMaxRetries()) {
final String errorMessage = "Maximum retries exceeded for SubscribeToShard. " +
"Failed " + configuration.getSubscribeToShardMaxRetries() + " times.";
LOG.error(errorMessage, ex.getCause());
throw new RuntimeException(errorMessage, ex.getCause());
}
attempt++;
backoff(ex);
return INCOMPLETE;
}
return complete ? COMPLETE : INCOMPLETE;
}