private RecordPublisherRunResult consumeAllRecordsFromKinesisShard()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java [385:434]


    private RecordPublisherRunResult consumeAllRecordsFromKinesisShard(
            final Consumer<SubscribeToShardEvent> eventConsumer,
            final FanOutShardSubscription subscription)
            throws InterruptedException, FanOutSubscriberException {
        String continuationSequenceNumber;
        RecordPublisherRunResult result = COMPLETE;

        do {
            if (!runningSupplier.get()) {
                LOG.info("FanOutShardSubscriber cancelled - {} ({})", shardId, consumerArn);
                return CANCELLED;
            }

            FanOutSubscriptionEvent subscriptionEvent;
            if (subscriptionErrorEvent.get() != null) {
                subscriptionEvent = subscriptionErrorEvent.get();
            } else {
                // Read timeout occurs after 30 seconds, add a sanity timeout to prevent lockup
                subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(), MILLISECONDS);
            }

            if (subscriptionEvent == null) {
                LOG.info(
                        "Timed out polling events from network, reacquiring subscription - {} ({})",
                        shardId,
                        consumerArn);
                result = INCOMPLETE;
                break;
            } else if (subscriptionEvent.isSubscribeToShardEvent()) {
                // Request for KDS to send the next record batch
                subscription.requestRecord();

                SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
                continuationSequenceNumber = event.continuationSequenceNumber();
                if (!event.records().isEmpty()) {
                    eventConsumer.accept(event);
                }
            } else if (subscriptionEvent.isSubscriptionComplete()) {
                // The subscription is complete, but the shard might not be, so we return incomplete
                return INCOMPLETE;
            } else {
                handleErrorAndRethrow(subscriptionEvent.getThrowable());
                result = INCOMPLETE;
                break;
            }
        } while (continuationSequenceNumber != null);

        subscription.cancelSubscription();
        return result;
    }