in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutShardSubscriber.java [194:243]
private FanOutShardSubscription openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException {
SubscribeToShardRequest request = SubscribeToShardRequest.builder()
.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(startingPosition)
.build();
AtomicReference<Throwable> exception = new AtomicReference<>();
CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch);
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(e -> {
// Errors that occur while trying to acquire a subscription are only thrown from here
// Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription
// (errors are ignored here once the subscription is open)
if (waitForSubscriptionLatch.getCount() > 0) {
exception.set(e);
waitForSubscriptionLatch.countDown();
}
})
.subscriber(() -> subscription)
.build();
kinesis.subscribeToShard(request, responseHandler);
boolean subscriptionEstablished = waitForSubscriptionLatch
.await(subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);
if (!subscriptionEstablished) {
final String errorMessage = "Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")";
LOG.error(errorMessage);
subscription.cancelSubscription();
handleError(new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage)));
}
Throwable throwable = exception.get();
if (throwable != null) {
handleError(throwable);
}
LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
// Request the first record to kick off consumption
// Following requests are made by the FanOutShardSubscription on the netty thread
subscription.requestRecord();
return subscription;
}