private RecordPublisherRunResult runWithBackoff()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java [163:220]


    private RecordPublisherRunResult runWithBackoff(
            final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
        FanOutShardSubscriber fanOutShardSubscriber =
                new FanOutShardSubscriber(
                        consumerArn,
                        subscribedShard.getShard().getShardId(),
                        kinesisProxy,
                        configuration.getSubscribeToShardTimeout(),
                        runningSupplier);
        RecordPublisherRunResult result;

        try {
            result =
                    fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
                            toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
            attempt = 0;
        } catch (FanOutSubscriberInterruptedException ex) {
            LOG.info(
                    "Thread interrupted, closing record publisher for shard {}.",
                    subscribedShard.getShard().getShardId(),
                    ex);
            return CANCELLED;
        } catch (RecoverableFanOutSubscriberException ex) {
            // Recoverable errors should be reattempted without contributing to the retry policy
            // A recoverable error would not result in the Flink job being cancelled
            backoff(ex);
            return INCOMPLETE;
        } catch (FanOutSubscriberException ex) {
            // We have received an error from the network layer
            // This can be due to limits being exceeded, network timeouts, etc
            // We should backoff, reacquire a subscription and try again
            if (ex.getCause() instanceof ResourceNotFoundException) {
                LOG.warn(
                        "Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered."
                                + "Marking this shard as complete {} ({})",
                        subscribedShard.getShard().getShardId(),
                        consumerArn);

                return COMPLETE;
            }

            if (attempt == configuration.getSubscribeToShardMaxRetries()) {
                final String errorMessage =
                        "Maximum retries exceeded for SubscribeToShard. "
                                + "Failed "
                                + configuration.getSubscribeToShardMaxRetries()
                                + " times.";
                LOG.error(errorMessage, ex.getCause());
                throw new RuntimeException(errorMessage, ex.getCause());
            }

            attempt++;
            backoff(ex);
            return INCOMPLETE;
        }

        return result;
    }