in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutShardSubscriber.java [304:345]
private boolean consumeAllRecordsFromKinesisShard(
final Consumer<SubscribeToShardEvent> eventConsumer,
final FanOutShardSubscription subscription) throws InterruptedException, FanOutSubscriberException {
String continuationSequenceNumber;
boolean result = true;
do {
FanOutSubscriptionEvent subscriptionEvent;
if (subscriptionErrorEvent.get() != null) {
subscriptionEvent = subscriptionErrorEvent.get();
} else {
// Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup
subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(), MILLISECONDS);
}
if (subscriptionEvent == null) {
LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn);
result = false;
break;
} else if (subscriptionEvent.isSubscribeToShardEvent()) {
// Request for KDS to send the next record batch
subscription.requestRecord();
SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
continuationSequenceNumber = event.continuationSequenceNumber();
if (!event.records().isEmpty()) {
eventConsumer.accept(event);
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
// The subscription is complete, but the shard might not be, so we return incomplete
result = false;
break;
} else {
handleError(subscriptionEvent.getThrowable());
result = false;
break;
}
} while (continuationSequenceNumber != null);
subscription.cancelSubscription();
return result;
}