in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java [219:277]
private FanOutShardSubscription openSubscriptionToShard(final StartingPosition startingPosition)
throws FanOutSubscriberException, InterruptedException {
SubscribeToShardRequest request =
SubscribeToShardRequest.builder()
.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(startingPosition)
.build();
AtomicReference<Throwable> exception = new AtomicReference<>();
CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
FanOutShardSubscription subscription =
new FanOutShardSubscription(waitForSubscriptionLatch);
SubscribeToShardResponseHandler responseHandler =
SubscribeToShardResponseHandler.builder()
.onError(
e -> {
// Errors that occur while trying to acquire a subscription are
// only thrown from here
// Errors that occur during the subscription are surfaced here
// and to the FanOutShardSubscription
// (errors are ignored here once the subscription is open)
if (waitForSubscriptionLatch.getCount() > 0) {
exception.set(e);
waitForSubscriptionLatch.countDown();
}
})
.subscriber(() -> subscription)
.build();
kinesis.subscribeToShard(request, responseHandler);
boolean subscriptionEstablished =
waitForSubscriptionLatch.await(
subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);
if (!subscriptionEstablished) {
final String errorMessage =
"Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")";
LOG.error(errorMessage);
subscription.cancelSubscription();
handleError(
new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage)));
}
Throwable throwable = exception.get();
if (throwable != null) {
handleError(throwable);
}
LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
// Request the first record to kick off consumption
// Following requests are made by the FanOutShardSubscription on the netty thread
subscription.requestRecord();
return subscription;
}