in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java [385:434]
private RecordPublisherRunResult consumeAllRecordsFromKinesisShard(
final Consumer<SubscribeToShardEvent> eventConsumer,
final FanOutShardSubscription subscription)
throws InterruptedException, FanOutSubscriberException {
String continuationSequenceNumber;
RecordPublisherRunResult result = COMPLETE;
do {
if (!runningSupplier.get()) {
LOG.info("FanOutShardSubscriber cancelled - {} ({})", shardId, consumerArn);
return CANCELLED;
}
FanOutSubscriptionEvent subscriptionEvent;
if (subscriptionErrorEvent.get() != null) {
subscriptionEvent = subscriptionErrorEvent.get();
} else {
// Read timeout occurs after 30 seconds, add a sanity timeout to prevent lockup
subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(), MILLISECONDS);
}
if (subscriptionEvent == null) {
LOG.info(
"Timed out polling events from network, reacquiring subscription - {} ({})",
shardId,
consumerArn);
result = INCOMPLETE;
break;
} else if (subscriptionEvent.isSubscribeToShardEvent()) {
// Request for KDS to send the next record batch
subscription.requestRecord();
SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
continuationSequenceNumber = event.continuationSequenceNumber();
if (!event.records().isEmpty()) {
eventConsumer.accept(event);
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
// The subscription is complete, but the shard might not be, so we return incomplete
return INCOMPLETE;
} else {
handleErrorAndRethrow(subscriptionEvent.getThrowable());
result = INCOMPLETE;
break;
}
} while (continuationSequenceNumber != null);
subscription.cancelSubscription();
return result;
}