private RecordPublisherRunResult runWithBackoff()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutRecordPublisher.java [135:180]


	private RecordPublisherRunResult runWithBackoff(
			final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
		FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber(
				consumerArn,
				subscribedShard.getShard().getShardId(),
				kinesisProxy,
				configuration.getSubscribeToShardTimeout());
		boolean complete;

		try {
			complete = fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
					toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
			attempt = 0;
		} catch (FanOutSubscriberInterruptedException ex) {
			LOG.info("Thread interrupted, closing record publisher for shard {}.", subscribedShard.getShard().getShardId(), ex);
			return CANCELLED;
		} catch (RecoverableFanOutSubscriberException ex) {
			// Recoverable errors should be reattempted without contributing to the retry policy
			// A recoverable error would not result in the Flink job being cancelled
			backoff(ex);
			return INCOMPLETE;
		} catch (FanOutSubscriberException ex) {
			// We have received an error from the network layer
			// This can be due to limits being exceeded, network timeouts, etc
			// We should backoff, reacquire a subscription and try again
			if (ex.getCause() instanceof ResourceNotFoundException) {
				LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered." +
					"Marking this shard as complete {} ({})", subscribedShard.getShard().getShardId(), consumerArn);

				return COMPLETE;
			}

			if (attempt == configuration.getSubscribeToShardMaxRetries()) {
				final String errorMessage = "Maximum retries exceeded for SubscribeToShard. " +
					"Failed " + configuration.getSubscribeToShardMaxRetries() + " times.";
				LOG.error(errorMessage, ex.getCause());
				throw new RuntimeException(errorMessage, ex.getCause());
			}

			attempt++;
			backoff(ex);
			return INCOMPLETE;
		}

		return complete ? COMPLETE : INCOMPLETE;
	}