private boolean consumeAllRecordsFromKinesisShard()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutShardSubscriber.java [304:345]


	private boolean consumeAllRecordsFromKinesisShard(
			final Consumer<SubscribeToShardEvent> eventConsumer,
			final FanOutShardSubscription subscription) throws InterruptedException, FanOutSubscriberException {
		String continuationSequenceNumber;
		boolean result = true;

		do {
			FanOutSubscriptionEvent subscriptionEvent;
			if (subscriptionErrorEvent.get() != null) {
				subscriptionEvent = subscriptionErrorEvent.get();
			} else {
				// Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup
				subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(), MILLISECONDS);
			}

			if (subscriptionEvent == null) {
				LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn);
				result = false;
				break;
			} else if (subscriptionEvent.isSubscribeToShardEvent()) {
				// Request for KDS to send the next record batch
				subscription.requestRecord();

				SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
				continuationSequenceNumber = event.continuationSequenceNumber();
				if (!event.records().isEmpty()) {
					eventConsumer.accept(event);
				}
			} else if (subscriptionEvent.isSubscriptionComplete()) {
				// The subscription is complete, but the shard might not be, so we return incomplete
				result = false;
				break;
			} else {
				handleError(subscriptionEvent.getThrowable());
				result = false;
				break;
			}
		} while (continuationSequenceNumber != null);

		subscription.cancelSubscription();
		return result;
	}