public SubscribeToShardEvent nextEvent()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java [209:249]


    public SubscribeToShardEvent nextEvent() {
        Throwable throwable = subscriptionException.getAndSet(null);
        if (throwable != null) {
            // If consumer is still activating, we want to wait.
            if (ExceptionUtils.findThrowable(throwable, ResourceInUseException.class).isPresent()) {
                return null;
            }
            // We don't want to wrap ResourceNotFoundExceptions because it is handled via a
            // try-catch loop
            if (throwable instanceof ResourceNotFoundException) {
                throw (ResourceNotFoundException) throwable;
            }
            Optional<? extends Throwable> recoverableException =
                    RECOVERABLE_EXCEPTIONS.stream()
                            .map(clazz -> ExceptionUtils.findThrowable(throwable, clazz))
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .findFirst();
            if (recoverableException.isPresent()) {
                LOG.warn(
                        "Recoverable exception encountered while subscribing to shard. Ignoring.",
                        recoverableException.get());
                shardSubscriber.cancel();
                activateSubscription();
                return null;
            }
            LOG.error("Subscription encountered unrecoverable exception.", throwable);
            throw new KinesisStreamsSourceException(
                    "Subscription encountered unrecoverable exception.", throwable);
        }

        if (!subscriptionActive.get()) {
            LOG.debug(
                    "Subscription to shard {} for consumer {} is not yet active. Skipping.",
                    shardId,
                    consumerArn);
            return null;
        }

        return eventQueue.poll();
    }