private FanOutShardSubscription openSubscriptionToShard()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java [231:288]


    private FanOutShardSubscription openSubscriptionToShard(final StartingPosition startingPosition)
            throws FanOutSubscriberException, InterruptedException {
        SubscribeToShardRequest request =
                SubscribeToShardRequest.builder()
                        .consumerARN(consumerArn)
                        .shardId(shardId)
                        .startingPosition(startingPosition)
                        .build();

        AtomicReference<Throwable> exception = new AtomicReference<>();
        CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
        FanOutShardSubscription subscription =
                new FanOutShardSubscription(waitForSubscriptionLatch);

        SubscribeToShardResponseHandler responseHandler =
                SubscribeToShardResponseHandler.builder()
                        .onError(
                                e -> {
                                    // Errors that occur while trying to acquire a subscription are
                                    // only thrown from here
                                    // Errors that occur during the subscription are surfaced here
                                    // and to the FanOutShardSubscription
                                    //	(errors are ignored here once the subscription is open)
                                    if (waitForSubscriptionLatch.getCount() > 0) {
                                        exception.set(e);
                                        waitForSubscriptionLatch.countDown();
                                    }
                                })
                        .subscriber(() -> subscription)
                        .build();

        kinesis.subscribeToShard(request, responseHandler);

        boolean subscriptionTimedOut =
                !waitForSubscriptionLatch.await(
                        subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);

        if (subscriptionTimedOut) {
            final String errorMessage =
                    "Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")";
            LOG.error(errorMessage);
            subscription.cancelSubscription();
            handleErrorAndRethrow(new TimeoutException(errorMessage));
        }

        Throwable throwable = exception.get();
        if (throwable != null) {
            handleErrorAndRethrow(throwable);
        }

        LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);

        // Request the first record to kick off consumption
        // Following requests are made by the FanOutShardSubscription on the netty thread
        subscription.requestRecord();

        return subscription;
    }