in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java [209:249]
public SubscribeToShardEvent nextEvent() {
Throwable throwable = subscriptionException.getAndSet(null);
if (throwable != null) {
// If consumer is still activating, we want to wait.
if (ExceptionUtils.findThrowable(throwable, ResourceInUseException.class).isPresent()) {
return null;
}
// We don't want to wrap ResourceNotFoundExceptions because it is handled via a
// try-catch loop
if (throwable instanceof ResourceNotFoundException) {
throw (ResourceNotFoundException) throwable;
}
Optional<? extends Throwable> recoverableException =
RECOVERABLE_EXCEPTIONS.stream()
.map(clazz -> ExceptionUtils.findThrowable(throwable, clazz))
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
if (recoverableException.isPresent()) {
LOG.warn(
"Recoverable exception encountered while subscribing to shard. Ignoring.",
recoverableException.get());
shardSubscriber.cancel();
activateSubscription();
return null;
}
LOG.error("Subscription encountered unrecoverable exception.", throwable);
throw new KinesisStreamsSourceException(
"Subscription encountered unrecoverable exception.", throwable);
}
if (!subscriptionActive.get()) {
LOG.debug(
"Subscription to shard {} for consumer {} is not yet active. Skipping.",
shardId,
consumerArn);
return null;
}
return eventQueue.poll();
}