in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java [163:220]
private RecordPublisherRunResult runWithBackoff(
final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
FanOutShardSubscriber fanOutShardSubscriber =
new FanOutShardSubscriber(
consumerArn,
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout(),
runningSupplier);
RecordPublisherRunResult result;
try {
result =
fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
attempt = 0;
} catch (FanOutSubscriberInterruptedException ex) {
LOG.info(
"Thread interrupted, closing record publisher for shard {}.",
subscribedShard.getShard().getShardId(),
ex);
return CANCELLED;
} catch (RecoverableFanOutSubscriberException ex) {
// Recoverable errors should be reattempted without contributing to the retry policy
// A recoverable error would not result in the Flink job being cancelled
backoff(ex);
return INCOMPLETE;
} catch (FanOutSubscriberException ex) {
// We have received an error from the network layer
// This can be due to limits being exceeded, network timeouts, etc
// We should backoff, reacquire a subscription and try again
if (ex.getCause() instanceof ResourceNotFoundException) {
LOG.warn(
"Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered."
+ "Marking this shard as complete {} ({})",
subscribedShard.getShard().getShardId(),
consumerArn);
return COMPLETE;
}
if (attempt == configuration.getSubscribeToShardMaxRetries()) {
final String errorMessage =
"Maximum retries exceeded for SubscribeToShard. "
+ "Failed "
+ configuration.getSubscribeToShardMaxRetries()
+ " times.";
LOG.error(errorMessage, ex.getCause());
throw new RuntimeException(errorMessage, ex.getCause());
}
attempt++;
backoff(ex);
return INCOMPLETE;
}
return result;
}